diff --git a/Cargo.lock b/Cargo.lock index bd57aafb75..5704dccafb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -767,7 +767,7 @@ dependencies = [ "df_operator", "log", "logger", - "meta_client_v2", + "meta_client", "query_engine", "server", "signal-hook", @@ -918,12 +918,13 @@ dependencies = [ "common_types 0.1.0", "common_util", "log", - "meta_client_v2", + "meta_client", "rust-fsm", "serde", "serde_derive", "serde_json", "snafu 0.6.10", + "table_engine", "tokio 1.20.1", ] @@ -2972,29 +2973,6 @@ dependencies = [ [[package]] name = "meta_client" version = "0.1.0" -dependencies = [ - "async-trait", - "catalog", - "ceresdbproto_deps", - "common_types 0.1.0", - "common_util", - "futures 0.3.21", - "grpcio 0.1.0", - "log", - "rand 0.7.3", - "reqwest 0.11.11", - "serde", - "serde_derive", - "serde_json", - "snafu 0.6.10", - "table_engine", - "tokio 1.20.1", - "url 2.2.2", -] - -[[package]] -name = "meta_client_v2" -version = "0.1.0" dependencies = [ "async-trait", "catalog", diff --git a/Cargo.toml b/Cargo.toml index d13557863b..833eeb52b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ members = [ "grpcio", "interpreters", "meta_client", - "meta_client_v2", "proto", "query_engine", "server", @@ -60,7 +59,7 @@ common_util = { path = "common_util" } df_operator = { path = "df_operator" } log = "0.4" logger = { path = "components/logger" } -meta_client_v2 = { path = "meta_client_v2" } +meta_client = { path = "meta_client" } query_engine = { path = "query_engine" } server = { path = "server" } table_engine = { path = "table_engine" } diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 773dc8cba6..9e6818feb1 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -14,10 +14,11 @@ catalog = { path = "../catalog" } common_types = { path = "../common_types" } common_util = { path = "../common_util" } log = "0.4" -meta_client_v2 = { path = "../meta_client_v2" } +meta_client = { path = "../meta_client" } rust-fsm = "0.6.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0.60" snafu = { version ="0.6.10", features = ["backtraces"]} +table_engine = { path = "../table_engine" } tokio = { version = "1.0", features = ["full"] } \ No newline at end of file diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index cb75b2aa1e..f1e8405daf 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -8,7 +8,7 @@ use std::{ use async_trait::async_trait; use common_util::runtime::{JoinHandle, Runtime}; use log::{error, info, warn}; -use meta_client_v2::{ +use meta_client::{ types::{ActionCmd, GetTablesRequest}, EventHandler, MetaClient, }; @@ -21,7 +21,7 @@ use tokio::{ use crate::{ config::ClusterConfig, table_manager::{ShardTableInfo, TableManager}, - Cluster, MetaClientFailure, Result, StartMetaClient, TableManipulator, + Cluster, ClusterTopologyRef, MetaClientFailure, Result, StartMetaClient, TableManipulator, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -190,7 +190,7 @@ impl Cluster for ClusterImpl { .await .context(StartMetaClient)?; - // start the backgroud loop for sending heartbeat. + // start the background loop for sending heartbeat. self.start_heartbeat_loop(); info!("Cluster has started"); @@ -221,4 +221,8 @@ impl Cluster for ClusterImpl { info!("Cluster has stopped"); Ok(()) } + + async fn fetch_topology(&self) -> Result { + todo!("fetch topology from the meta") + } } diff --git a/cluster/src/config.rs b/cluster/src/config.rs index 73345a22d2..9a48db7893 100644 --- a/cluster/src/config.rs +++ b/cluster/src/config.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use meta_client_v2::{meta_impl::MetaClientConfig, types::NodeMetaInfo}; +use meta_client::{meta_impl::MetaClientConfig, types::NodeMetaInfo}; use serde_derive::Deserialize; #[derive(Default, Clone, Deserialize, Debug)] diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 7ba4624714..cec77739c5 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -1,15 +1,18 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; +use common_types::{schema::TIMESTAMP_COLUMN, table::TableId}; use common_util::define_result; -pub use meta_client_v2::types::{ +pub use meta_client::types::{ AllocSchemaIdRequest, AllocSchemaIdResponse, AllocTableIdRequest, AllocTableIdResponse, DropTableRequest, GetTablesRequest, }; -use meta_client_v2::types::{ShardId, TableId}; +use meta_client::types::{ShardId, ShardInfo}; +use serde::Deserialize; use snafu::{Backtrace, Snafu}; +use table_engine::ANALYTIC_ENGINE_TYPE; pub mod cluster_impl; pub mod config; @@ -19,13 +22,13 @@ mod table_manager; #[snafu(visibility = "pub")] pub enum Error { #[snafu(display("Build meta client failed, err:{}.", source))] - BuildMetaClient { source: meta_client_v2::Error }, + BuildMetaClient { source: meta_client::Error }, #[snafu(display("Meta client start failed, err:{}.", source))] - StartMetaClient { source: meta_client_v2::Error }, + StartMetaClient { source: meta_client::Error }, #[snafu(display("Meta client execute failed, err:{}.", source))] - MetaClientFailure { source: meta_client_v2::Error }, + MetaClientFailure { source: meta_client::Error }, #[snafu(display( "Shard not found in current node, shard_id:{}.\nBacktrace:\n{}", @@ -40,9 +43,54 @@ pub enum Error { define_result!(Error); -pub type ClusterRef = Arc; +pub type TableName = String; +pub type SchemaName = String; +pub type ClusterRef = Arc; pub type TableManipulatorRef = Arc; +pub type ClusterTopologyRef = Arc; + +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct SchemaConfig { + pub auto_create_tables: bool, + pub default_engine_type: String, + pub default_timestamp_column_name: String, +} + +impl Default for SchemaConfig { + fn default() -> Self { + Self { + auto_create_tables: false, + default_engine_type: ANALYTIC_ENGINE_TYPE.to_string(), + default_timestamp_column_name: TIMESTAMP_COLUMN.to_string(), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Node { + pub addr: String, + pub port: u16, +} + +#[derive(Debug, Clone)] +pub struct TableNodeShards { + pub table_id: TableId, + pub node_shards: Vec, +} + +#[derive(Debug, Clone)] +pub struct NodeShard { + pub shard: ShardInfo, + pub node: Node, +} + +#[derive(Clone, Debug, Default)] +pub struct ClusterTopology { + pub schema_tables: HashMap>, + pub schema_configs: HashMap, +} #[async_trait] pub trait TableManipulator { @@ -66,5 +114,5 @@ pub trait TableManipulator { pub trait Cluster { async fn start(&self) -> Result<()>; async fn stop(&self) -> Result<()>; - // TODO: add more methods, such as provide the topology of the cluster. + async fn fetch_topology(&self) -> Result; } diff --git a/cluster/src/table_manager.rs b/cluster/src/table_manager.rs index 0fa619f28b..f05ac5d2af 100644 --- a/cluster/src/table_manager.rs +++ b/cluster/src/table_manager.rs @@ -5,15 +5,11 @@ use std::{ sync::RwLock, }; -use meta_client_v2::types::{ - CreateTableCmd, SchemaId, ShardId, ShardInfo, ShardTables, TableId, TableInfo, -}; +use common_types::{schema::SchemaId, table::TableId}; +use meta_client::types::{CreateTableCmd, ShardId, ShardInfo, ShardTables, TableInfo}; use snafu::OptionExt; -use crate::{Result, ShardNotFound}; - -pub type TableName = String; -pub type SchemaName = String; +use crate::{Result, SchemaName, ShardNotFound, TableName}; #[derive(Debug, Clone)] #[allow(dead_code)] diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index a07ca1cdb0..692d45be5c 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -19,6 +19,7 @@ pub mod row; #[cfg(feature = "arrow_deps")] pub mod schema; pub mod string; +pub mod table; pub mod time; /// Sequence number diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 5e7dfd5683..317edeb7bf 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -152,7 +152,8 @@ pub enum Error { }, } -// TODO(boyan) make these constants configurable +pub type SchemaId = u32; +// TODO: make these constants configurable pub const TSID_COLUMN: &str = "tsid"; pub const TIMESTAMP_COLUMN: &str = "timestamp"; diff --git a/common_types/src/table.rs b/common_types/src/table.rs new file mode 100644 index 0000000000..c74b8dbf42 --- /dev/null +++ b/common_types/src/table.rs @@ -0,0 +1,3 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +pub type TableId = u64; diff --git a/docs/guides/src/dev/crate-deps.dot b/docs/guides/src/dev/crate-deps.dot index a265fa5386..2b28e8460a 100644 --- a/docs/guides/src/dev/crate-deps.dot +++ b/docs/guides/src/dev/crate-deps.dot @@ -37,7 +37,7 @@ digraph G { cluster -> analytic_engine cluster -> catalog - cluster -> meta_client_v2 + cluster -> meta_client interpreters -> catalog interpreters -> sql @@ -46,12 +46,6 @@ digraph G { interpreters -> query_engine interpreters -> arrow_deps - meta_client -> catalog - meta_client -> table_engine - - meta_client_v2 -> catalog - meta_client_v2 -> table_engine - query_engine -> arrow_deps query_engine -> sql query_engine -> table_engine diff --git a/docs/guides/src/dev/crate-deps.svg b/docs/guides/src/dev/crate-deps.svg index b2a918ea23..fdb029b1b1 100644 --- a/docs/guides/src/dev/crate-deps.svg +++ b/docs/guides/src/dev/crate-deps.svg @@ -1,433 +1,403 @@ - - + G - + arrow_deps - -arrow_deps + +arrow_deps analytic_engine - -analytic_engine + +analytic_engine analytic_engine->arrow_deps - - + + proto - -proto + +proto analytic_engine->proto - - + + table_engine - -table_engine + +table_engine analytic_engine->table_engine - - + + wal - -wal + +wal analytic_engine->wal - - + + - + table_engine->arrow_deps - - + + - + table_engine->proto - - + + catalog - -catalog + +catalog catalog->table_engine - - + + catalog_impls - -catalog_impls + +catalog_impls catalog_impls->table_engine - - + + catalog_impls->catalog - - + + system_catalog - -system_catalog + +system_catalog catalog_impls->system_catalog - - + + - + system_catalog->arrow_deps - - + + - + system_catalog->proto - - + + - + system_catalog->table_engine - - + + - + system_catalog->catalog - - + + cluster - -cluster + +cluster cluster->analytic_engine - - + + cluster->catalog - - + + - + -meta_client_v2 - -meta_client_v2 +meta_client + +meta_client - + -cluster->meta_client_v2 - - - - - -meta_client_v2->table_engine - - - - - -meta_client_v2->catalog - - +cluster->meta_client + + interpreters - -interpreters + +interpreters interpreters->arrow_deps - - + + interpreters->table_engine - - + + interpreters->catalog - - + + sql - -sql + +sql interpreters->sql - - + + df_operator - -df_operator + +df_operator interpreters->df_operator - - + + query_engine - -query_engine + +query_engine interpreters->query_engine - - + + - + sql->arrow_deps - - + + - + sql->table_engine - - + + - + sql->catalog - - + + - + sql->df_operator - - + + - + df_operator->arrow_deps - - + + - + query_engine->arrow_deps - - + + - + query_engine->table_engine - - + + - + query_engine->sql - - + + - + query_engine->df_operator - - - - - -meta_client - -meta_client - - - -meta_client->table_engine - - - - - -meta_client->catalog - - + + - + server - -server + +server - + server->arrow_deps - - + + - + server->analytic_engine - - + + - + server->table_engine - - + + - + server->catalog - - + + - + server->system_catalog - - + + + + + +server->meta_client + + - + server->interpreters - - + + - + server->sql - - + + - + server->df_operator - - + + - + server->query_engine - - - - - -server->meta_client - - + + - + ceresdb - -ceresdb + +ceresdb - + ceresdb->analytic_engine - - + + - + ceresdb->table_engine - - + + - + ceresdb->catalog - - + + - + ceresdb->catalog_impls - - + + - + ceresdb->df_operator - - + + - + ceresdb->query_engine - - + + - + ceresdb->server - - + + diff --git a/meta_client/Cargo.toml b/meta_client/Cargo.toml index 6a55e41ca7..1ffcc768d0 100644 --- a/meta_client/Cargo.toml +++ b/meta_client/Cargo.toml @@ -1,7 +1,6 @@ [package] name = "meta_client" version = "0.1.0" -authors = ["CeresDB Authors "] [package.edition] workspace = true @@ -11,13 +10,13 @@ workspace = true [dependencies] async-trait = "0.1.53" catalog = { path = "../catalog" } -common_types = { path = "../common_types" } -table_engine = { path = "../table_engine" } ceresdbproto_deps = { path = "../ceresdbproto_deps" } +common_types = { path = "../common_types" } common_util = { path = "../common_util" } futures = "0.3" grpcio = { path = "../grpcio" } log = "0.4" +protobuf = "2.20" rand = "0.7" reqwest = "0.11" serde = "1.0" diff --git a/meta_client/src/lib.rs b/meta_client/src/lib.rs index 3878293feb..bbbfe671fa 100644 --- a/meta_client/src/lib.rs +++ b/meta_client/src/lib.rs @@ -1,226 +1,135 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -//! Client to communicate with meta - -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use async_trait::async_trait; -use common_types::schema::TIMESTAMP_COLUMN; use common_util::define_result; -use serde_derive::Deserialize; use snafu::{Backtrace, Snafu}; -use table_engine::ANALYTIC_ENGINE_TYPE; - -use crate::static_client::StaticMetaClient; +use types::{ + ActionCmd, AllocSchemaIdRequest, AllocSchemaIdResponse, AllocTableIdRequest, + AllocTableIdResponse, DropTableRequest, GetTablesRequest, GetTablesResponse, ShardInfo, +}; -mod load_balance; -mod static_client; +pub mod meta_impl; +pub mod types; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] pub enum Error { #[snafu(display( - "Invalid node addr of cluster view, node:{}.\nBacktrace:\n{}", - node, + "Failed to fetch action cmd, err:{}.\nBacktrace:\n{}", + source, backtrace ))] - InvalidNodeAddr { node: String, backtrace: Backtrace }, + FetchActionCmd { + source: grpcio::Error, + backtrace: Backtrace, + }, #[snafu(display( - "Invalid node port of cluster view, node:{}, err:{}.\nBacktrace:\n{}", - node, + "Failed to init heatbeat stream, err:{}.\nBacktrace:\n{}", source, backtrace ))] - InvalidNodePort { - node: String, - source: std::num::ParseIntError, + InitHeartBeatStream { + source: grpcio::Error, backtrace: Backtrace, }, #[snafu(display( - "Failed to create schema:{}, catalog:{}, err:{}", - schema, - catalog, - source + "Failed to get grpc client, grpc client is not inited.\nBacktrace:\n{}", + backtrace ))] - FailOnChangeView { - schema: String, - catalog: String, + FailGetGrpcClient { backtrace: Backtrace }, + + #[snafu(display("Failed to send heartbeat, cluster:{}, err:{}", cluster, source))] + FailSendHeartbeat { + cluster: String, source: Box, }, - #[snafu(display("Failed to get catalog:{}, err:{}", catalog, source))] - FailGetCatalog { - catalog: String, + #[snafu(display("Failed to alloc schema id, err:{}", source))] + FailAllocSchemaId { source: Box, }, -} - -define_result!(Error); -type ShardViewMap = HashMap; - -#[async_trait] -pub trait MetaWatcher { - async fn on_change(&self, view: ClusterViewRef) -> Result<()>; -} - -pub type MetaWatcherPtr = Box; - -/// Meta client abstraction -#[async_trait] -pub trait MetaClient { - /// Start the meta client - async fn start(&self) -> Result<()>; - - /// Get current cluster view. - /// - /// The cluster view is updated by background workers periodically - fn get_cluster_view(&self) -> ClusterViewRef; -} + #[snafu(display("Failed to alloc table id, err:{}", source))] + FailAllocTableId { + source: Box, + }, -// TODO(yingwen): Now meta use i32 as shard id, maybe switch to unsigned number -pub type ShardId = i32; + #[snafu(display("Failed to drop table, err:{}", source))] + FailDropTable { + source: Box, + }, -#[derive(Debug, Clone, Deserialize)] -pub struct Node { - pub addr: String, - pub port: u32, -} + #[snafu(display("Failed to get tables, err:{}", source))] + FailGetTables { + source: Box, + }, -#[derive(Debug, Clone, Deserialize)] -pub struct ShardView { - pub shard_id: ShardId, - pub node: Node, -} + #[snafu(display( + "Meta rpc error, resp code:{}, msg:{}.\nBacktrace:\n{}", + code, + msg, + backtrace + ))] + MetaRpc { + code: u32, + msg: String, + backtrace: Backtrace, + }, -fn default_engine_type() -> String { - ANALYTIC_ENGINE_TYPE.to_string() + #[snafu(display( + "Handle event failed, handler:{}, event:{:?}, err:{}", + name, + event, + source + ))] + FailHandleEvent { + name: String, + event: ActionCmd, + source: Box, + }, } -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct SchemaConfig { - pub auto_create_tables: bool, - pub default_engine_type: String, - pub default_timestamp_column_name: String, -} +define_result!(Error); -impl Default for SchemaConfig { - fn default() -> Self { - Self { - auto_create_tables: false, - default_engine_type: default_engine_type(), - default_timestamp_column_name: default_timestamp_column_name(), - } - } -} +pub type EventHandlerRef = Arc; -impl From for SchemaConfig { - fn from(view: SchemaShardView) -> Self { - Self { - auto_create_tables: view.auto_create_tables, - default_engine_type: view.default_engine_type, - default_timestamp_column_name: view.default_timestamp_column_name, - } - } -} +#[async_trait] +pub trait EventHandler { + fn name(&self) -> &str; -#[derive(Clone, Debug, Default, Deserialize)] -pub struct ClusterView { - pub schema_shards: HashMap, - pub schema_configs: HashMap, + async fn handle( + &self, + event: &ActionCmd, + ) -> std::result::Result<(), Box>; } -pub type ClusterViewRef = Arc; - -#[derive(Clone, Debug, Deserialize)] -#[serde(default)] -pub struct MetaClientConfig { - pub cluster: String, - /// Local ip address of this node, used as endpoint ip in meta. - pub node: String, - /// Grpc port of this node, also used as endpoint port in meta. - pub port: u16, - /// The static cluster view used by static meta client. - pub cluster_view: ClusterViewConfig, -} +/// MetaClient is the abstraction of client used to communicate with CeresMeta +/// cluster. +#[async_trait] +pub trait MetaClient: Send + Sync { + /// Start the meta client and the events will occur afterwards. + async fn start(&self) -> Result<()>; + /// Stop the meta client and release all the resources. + async fn stop(&self) -> Result<()>; -impl Default for MetaClientConfig { - fn default() -> Self { - Self { - cluster: String::new(), - node: String::new(), - port: 8831, - cluster_view: ClusterViewConfig { - schema_shards: Vec::new(), - }, - } - } -} + /// Register handler for the event. + /// + /// It is better to register handlers before calling `start`. + async fn register_event_handler(&self, handler: EventHandlerRef) -> Result<()>; -#[derive(Debug, Deserialize, Clone)] -#[serde(default)] -pub struct SchemaShardView { - schema: String, - auto_create_tables: bool, - pub default_engine_type: String, - default_timestamp_column_name: String, - shard_views: Vec, -} + async fn alloc_schema_id(&self, req: AllocSchemaIdRequest) -> Result; -impl Default for SchemaShardView { - fn default() -> Self { - Self { - schema: "".to_string(), - auto_create_tables: false, - default_engine_type: default_engine_type(), - default_timestamp_column_name: default_timestamp_column_name(), - shard_views: Vec::default(), - } - } -} + async fn alloc_table_id(&self, req: AllocTableIdRequest) -> Result; -#[inline] -fn default_timestamp_column_name() -> String { - TIMESTAMP_COLUMN.to_string() -} + async fn drop_table(&self, req: DropTableRequest) -> Result<()>; -#[derive(Debug, Deserialize, Clone)] -pub struct ClusterViewConfig { - schema_shards: Vec, -} + async fn get_tables(&self, req: GetTablesRequest) -> Result; -impl ClusterViewConfig { - pub(crate) fn to_cluster_view(&self) -> ClusterView { - let mut schema_configs = HashMap::with_capacity(self.schema_shards.len()); - let mut schema_shards = HashMap::with_capacity(self.schema_shards.len()); - - for schema_shard_view in self.schema_shards.clone() { - let schema = schema_shard_view.schema.clone(); - schema_shards.insert( - schema.clone(), - schema_shard_view - .shard_views - .iter() - .map(|shard| (shard.shard_id, shard.clone())) - .collect(), - ); - schema_configs.insert(schema, SchemaConfig::from(schema_shard_view)); - } - ClusterView { - schema_shards, - schema_configs, - } - } + async fn send_heartbeat(&self, req: Vec) -> Result<()>; } -/// Create a meta client with given `config`. -pub fn build_meta_client( - config: MetaClientConfig, - watcher: Option, -) -> Result> { - let meta_client = StaticMetaClient::new(config, watcher); - Ok(Arc::new(meta_client)) -} +pub type MetaClientRef = Arc; diff --git a/meta_client_v2/src/meta_impl.rs b/meta_client/src/meta_impl.rs similarity index 100% rename from meta_client_v2/src/meta_impl.rs rename to meta_client/src/meta_impl.rs diff --git a/meta_client/src/static_client.rs b/meta_client/src/static_client.rs deleted file mode 100644 index 8639100f53..0000000000 --- a/meta_client/src/static_client.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Static meta client. - -use std::{collections::HashMap, sync::Arc}; - -use async_trait::async_trait; -use log::info; - -use crate::{ - ClusterView, ClusterViewConfig, ClusterViewRef, MetaClient, MetaClientConfig, MetaWatcherPtr, - Node, Result, ShardView, -}; - -/// Static meta client. -pub struct StaticMetaClient { - cluster_view: ClusterViewRef, - watcher: Option, -} - -impl StaticMetaClient { - pub fn new(config: MetaClientConfig, watcher: Option) -> Self { - let cluster_view = match new_cluster_view(&config.cluster_view) { - Some(v) => v, - None => cluster_view_without_meta(&config.node, config.port), - }; - - Self { - cluster_view: Arc::new(cluster_view), - watcher, - } - } -} - -#[async_trait] -impl MetaClient for StaticMetaClient { - async fn start(&self) -> Result<()> { - info!( - "File meta client is starting, cluster_view:{:?}", - self.cluster_view - ); - - info!("File meta client invoke watcher"); - - if let Some(w) = &self.watcher { - w.on_change(self.cluster_view.clone()).await?; - } - - info!("File meta client has started"); - - Ok(()) - } - - fn get_cluster_view(&self) -> ClusterViewRef { - self.cluster_view.clone() - } -} - -fn new_cluster_view(config: &ClusterViewConfig) -> Option { - if config.schema_shards.is_empty() { - return None; - } - - Some(config.to_cluster_view()) -} - -fn cluster_view_without_meta(addr: &str, port: u16) -> ClusterView { - let shard_id = 0; - let mut static_shards = HashMap::new(); - static_shards.insert( - shard_id, - ShardView { - shard_id, - node: Node { - addr: addr.to_string(), - port: u32::from(port), - }, - }, - ); - let mut schema_shards = HashMap::new(); - schema_shards.insert(catalog::consts::DEFAULT_SCHEMA.to_string(), static_shards); - ClusterView { - schema_shards, - schema_configs: HashMap::default(), - } -} diff --git a/meta_client_v2/src/types.rs b/meta_client/src/types.rs similarity index 98% rename from meta_client_v2/src/types.rs rename to meta_client/src/types.rs index 98aeb62979..9e82ab1ae7 100644 --- a/meta_client_v2/src/types.rs +++ b/meta_client/src/types.rs @@ -6,12 +6,11 @@ use ceresdbproto_deps::ceresdbproto::{ cluster::ShardRole as PbShardRole, meta_service::{self, NodeHeartbeatResponse_oneof_cmd}, }; +use common_types::{schema::SchemaId, table::TableId}; use common_util::config::ReadableDuration; use serde_derive::Deserialize; -pub type TableId = u64; pub type ShardId = u32; -pub type SchemaId = u32; #[derive(Debug, Clone)] pub struct RequestHeader { @@ -116,7 +115,8 @@ pub enum ShardRole { FOLLOWER, } -// TODO: now some commands are empty and fill the concret inforamtion into them. +// TODO: now some commands are empty and fill the concrete information into +// them. #[derive(Debug, Clone)] pub enum ActionCmd { MetaNoneCmd(NoneCmd), diff --git a/meta_client_v2/Cargo.toml b/meta_client_v2/Cargo.toml deleted file mode 100644 index e0ccc95a30..0000000000 --- a/meta_client_v2/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "meta_client_v2" -version = "0.1.0" - -[package.edition] -workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "0.1.53" -catalog = { path = "../catalog" } -ceresdbproto_deps = { path = "../ceresdbproto_deps" } -common_types = { path = "../common_types" } -common_util = { path = "../common_util" } -futures = "0.3" -grpcio = { path = "../grpcio" } -log = "0.4" -protobuf = "2.20" -rand = "0.7" -reqwest = "0.11" -serde = "1.0" -serde_derive = "1.0.81" -serde_json = "1.0.60" -snafu = { version ="0.6.10", features = ["backtraces"]} -tokio = { version = "1.0", features = ["full"] } -url = "2.2" diff --git a/meta_client_v2/src/lib.rs b/meta_client_v2/src/lib.rs deleted file mode 100644 index bbbfe671fa..0000000000 --- a/meta_client_v2/src/lib.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -use std::sync::Arc; - -use async_trait::async_trait; -use common_util::define_result; -use snafu::{Backtrace, Snafu}; -use types::{ - ActionCmd, AllocSchemaIdRequest, AllocSchemaIdResponse, AllocTableIdRequest, - AllocTableIdResponse, DropTableRequest, GetTablesRequest, GetTablesResponse, ShardInfo, -}; - -pub mod meta_impl; -pub mod types; - -#[derive(Debug, Snafu)] -#[snafu(visibility = "pub")] -pub enum Error { - #[snafu(display( - "Failed to fetch action cmd, err:{}.\nBacktrace:\n{}", - source, - backtrace - ))] - FetchActionCmd { - source: grpcio::Error, - backtrace: Backtrace, - }, - - #[snafu(display( - "Failed to init heatbeat stream, err:{}.\nBacktrace:\n{}", - source, - backtrace - ))] - InitHeartBeatStream { - source: grpcio::Error, - backtrace: Backtrace, - }, - - #[snafu(display( - "Failed to get grpc client, grpc client is not inited.\nBacktrace:\n{}", - backtrace - ))] - FailGetGrpcClient { backtrace: Backtrace }, - - #[snafu(display("Failed to send heartbeat, cluster:{}, err:{}", cluster, source))] - FailSendHeartbeat { - cluster: String, - source: Box, - }, - - #[snafu(display("Failed to alloc schema id, err:{}", source))] - FailAllocSchemaId { - source: Box, - }, - - #[snafu(display("Failed to alloc table id, err:{}", source))] - FailAllocTableId { - source: Box, - }, - - #[snafu(display("Failed to drop table, err:{}", source))] - FailDropTable { - source: Box, - }, - - #[snafu(display("Failed to get tables, err:{}", source))] - FailGetTables { - source: Box, - }, - - #[snafu(display( - "Meta rpc error, resp code:{}, msg:{}.\nBacktrace:\n{}", - code, - msg, - backtrace - ))] - MetaRpc { - code: u32, - msg: String, - backtrace: Backtrace, - }, - - #[snafu(display( - "Handle event failed, handler:{}, event:{:?}, err:{}", - name, - event, - source - ))] - FailHandleEvent { - name: String, - event: ActionCmd, - source: Box, - }, -} - -define_result!(Error); - -pub type EventHandlerRef = Arc; - -#[async_trait] -pub trait EventHandler { - fn name(&self) -> &str; - - async fn handle( - &self, - event: &ActionCmd, - ) -> std::result::Result<(), Box>; -} - -/// MetaClient is the abstraction of client used to communicate with CeresMeta -/// cluster. -#[async_trait] -pub trait MetaClient: Send + Sync { - /// Start the meta client and the events will occur afterwards. - async fn start(&self) -> Result<()>; - /// Stop the meta client and release all the resources. - async fn stop(&self) -> Result<()>; - - /// Register handler for the event. - /// - /// It is better to register handlers before calling `start`. - async fn register_event_handler(&self, handler: EventHandlerRef) -> Result<()>; - - async fn alloc_schema_id(&self, req: AllocSchemaIdRequest) -> Result; - - async fn alloc_table_id(&self, req: AllocTableIdRequest) -> Result; - - async fn drop_table(&self, req: DropTableRequest) -> Result<()>; - - async fn get_tables(&self, req: GetTablesRequest) -> Result; - - async fn send_heartbeat(&self, req: Vec) -> Result<()>; -} - -pub type MetaClientRef = Arc; diff --git a/meta_client_v2/src/load_balance.rs b/meta_client_v2/src/load_balance.rs deleted file mode 100644 index 707fb08d98..0000000000 --- a/meta_client_v2/src/load_balance.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Load balancer - -use common_util::define_result; -use rand::Rng; -use snafu::{Backtrace, Snafu}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Meta Addresses empty.\nBacktrace:\n{}", backtrace))] - MetaAddressesEmpty { backtrace: Backtrace }, -} - -define_result!(Error); - -pub trait LoadBalancer { - fn select<'a>(&self, addresses: &'a [String]) -> Result<&'a String>; -} - -pub struct RandomLoadBalancer; - -impl LoadBalancer for RandomLoadBalancer { - fn select<'a>(&self, addresses: &'a [String]) -> Result<&'a String> { - if addresses.is_empty() { - return MetaAddressesEmpty.fail(); - } - - let len = addresses.len(); - if len == 1 { - return Ok(&addresses[0]); - } - let mut rng = rand::thread_rng(); - let idx = rng.gen_range(0, len); - - Ok(&addresses[idx]) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_random_loadbalancer() { - let lb = RandomLoadBalancer; - let addresses = vec![ - "127.0.0.1:8080".to_string(), - "127.0.0.2:8080".to_string(), - "127.0.0.3:8080".to_string(), - "127.0.0.4:8080".to_string(), - "127.0.0.5:8080".to_string(), - ]; - for _idx in 0..100 { - let addr = lb.select(&addresses).unwrap(); - assert!(addresses.contains(addr)); - } - - // Empty case - assert!(lb.select(&[]).is_err()); - - let addresses = ["127.0.0.1:5000".to_string()]; - assert_eq!(&addresses[0], lb.select(&addresses).unwrap()); - } -} diff --git a/server/src/config.rs b/server/src/config.rs index cc291804ef..b04b58f89b 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -2,13 +2,24 @@ //! Server configs +use std::collections::HashMap; + use analytic_engine; -use cluster::config::ClusterConfig; -use meta_client::MetaClientConfig; +use cluster::{config::ClusterConfig, Node, SchemaConfig}; +use common_types::schema::TIMESTAMP_COLUMN; +use meta_client::types::ShardId; use serde_derive::Deserialize; +use table_engine::ANALYTIC_ENGINE_TYPE; -use crate::router::RuleList; +use crate::route::rule_based::{ClusterView, RuleList}; +/// The deployment mode decides how to start the CeresDB. +/// +/// [DeployMode::Standalone] means to start one or multiple CeresDB instance(s) +/// alone without CeresMeta. +/// +/// [DeployMode::Cluster] means to start one or multiple CeresDB instance(s) +/// under the control of CeresMeta. #[derive(Debug, Clone, Copy, Deserialize)] pub enum DeployMode { Standalone, @@ -28,6 +39,81 @@ pub struct RuntimeConfig { pub background_thread_num: usize, } +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(default)] +pub struct StaticRouteConfig { + pub rule_list: RuleList, + pub topology: StaticTopologyConfig, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ShardView { + pub shard_id: ShardId, + pub node: Node, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(default)] +pub struct SchemaShardView { + pub schema: String, + pub auto_create_tables: bool, + pub default_engine_type: String, + pub default_timestamp_column_name: String, + pub shard_views: Vec, +} + +impl Default for SchemaShardView { + fn default() -> Self { + Self { + schema: "".to_string(), + auto_create_tables: false, + default_engine_type: ANALYTIC_ENGINE_TYPE.to_string(), + default_timestamp_column_name: TIMESTAMP_COLUMN.to_string(), + shard_views: Vec::default(), + } + } +} + +impl From for SchemaConfig { + fn from(view: SchemaShardView) -> Self { + Self { + auto_create_tables: view.auto_create_tables, + default_engine_type: view.default_engine_type, + default_timestamp_column_name: view.default_timestamp_column_name, + } + } +} + +#[derive(Debug, Default, Deserialize, Clone)] +#[serde(default)] +pub struct StaticTopologyConfig { + schema_shards: Vec, +} + +impl From<&StaticTopologyConfig> for ClusterView { + fn from(config: &StaticTopologyConfig) -> Self { + let mut schema_configs = HashMap::with_capacity(config.schema_shards.len()); + let mut schema_shards = HashMap::with_capacity(config.schema_shards.len()); + + for schema_shard_view in config.schema_shards.clone() { + let schema = schema_shard_view.schema.clone(); + schema_shards.insert( + schema.clone(), + schema_shard_view + .shard_views + .iter() + .map(|shard| (shard.shard_id, shard.node.clone())) + .collect(), + ); + schema_configs.insert(schema, SchemaConfig::from(schema_shard_view)); + } + ClusterView { + schema_shards, + schema_configs, + } + } +} + // TODO(yingwen): Split config into several sub configs. #[derive(Clone, Debug, Deserialize)] #[serde(default)] @@ -52,15 +138,15 @@ pub struct Config { pub tracing_log_name: String, pub tracing_level: String, - // Meta client related configs: - pub meta_client: MetaClientConfig, - // Config of router. - pub route_rules: RuleList, + // Config of static router. + pub static_route: StaticRouteConfig, // Analytic engine configs: pub analytic: analytic_engine::Config, - pub cluster: ClusterConfig, + + // Deployment configs: pub deploy_mode: DeployMode, + pub cluster: ClusterConfig, } impl Default for RuntimeConfig { @@ -90,15 +176,10 @@ impl Default for Config { tracing_log_dir: String::from("/tmp/ceresdb"), tracing_log_name: String::from("tracing"), tracing_level: String::from("info"), - meta_client: MetaClientConfig { - node: String::from("127.0.0.1"), - port: grpc_port, - ..Default::default() - }, - route_rules: RuleList::default(), + static_route: StaticRouteConfig::default(), analytic: analytic_engine::Config::default(), - cluster: ClusterConfig::default(), deploy_mode: DeployMode::Standalone, + cluster: ClusterConfig::default(), } } } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 02c2e53906..2e64d8e308 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -8,8 +8,6 @@ use std::{ time::Instant, }; -use async_trait::async_trait; -use catalog::{consts as catalogConst, manager::ManagerRef}; use ceresdbproto_deps::ceresdbproto::{ common::ResponseHeader, prometheus::{PrometheusQueryRequest, PrometheusQueryResponse}, @@ -19,6 +17,7 @@ use ceresdbproto_deps::ceresdbproto::{ }, storage_grpc::{self, StorageService}, }; +use cluster::SchemaConfig; use common_types::{ column_schema::{self, ColumnSchema}, datum::DatumKind, @@ -31,10 +30,6 @@ use grpcio::{ ServerStreamingSink, UnarySink, WriteFlags, }; use log::{error, info}; -use meta_client::{ - ClusterViewRef, FailGetCatalog, FailOnChangeView, MetaClient, MetaClientConfig, MetaWatcher, - SchemaConfig, -}; use query_engine::executor::Executor as QueryExecutor; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use sql::plan::CreateTablePlan; @@ -46,7 +41,8 @@ use crate::{ error::{ErrNoCause, ErrWithCause, Result as ServerResult, ServerError, StatusCode}, grpc::metrics::GRPC_HANDLER_DURATION_HISTOGRAM_VEC, instance::InstanceRef, - router::{Router, RouterRef, RuleBasedRouter, RuleList}, + route::{Router, RouterRef}, + schema_config_provider::{self, SchemaConfigProviderRef}, }; mod metrics; @@ -67,12 +63,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to build meta client, err:{}", source))] - BuildMetaClient { source: meta_client::Error }, - - #[snafu(display("Failed to start meta client, err:{}", source))] - StartMetaClient { source: meta_client::Error }, - #[snafu(display("Missing meta client config.\nBacktrace:\n{}", backtrace))] MissingMetaClientConfig { backtrace: Backtrace }, @@ -85,6 +75,12 @@ pub enum Error { #[snafu(display("Missing instance.\nBacktrace:\n{}", backtrace))] MissingInstance { backtrace: Backtrace }, + #[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))] + MissingRouter { backtrace: Backtrace }, + + #[snafu(display("Missing schema config provider.\nBacktrace:\n{}", backtrace))] + MissingSchemaConfigProvider { backtrace: Backtrace }, + #[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))] ParseCatalogName { source: std::string::FromUtf8Error, @@ -127,6 +123,11 @@ pub enum Error { source: grpcio::Error, backtrace: Backtrace, }, + + #[snafu(display("Get schema config failed, err:{}", source))] + GetSchemaConfig { + source: schema_config_provider::Error, + }, } const STREAM_QUERY_CHANNEL_LEN: usize = 20; @@ -171,7 +172,7 @@ impl<'a, Q> HandlerContext<'a, Q> { header: RequestHeader, router: Arc, instance: InstanceRef, - cluster_view: &'a ClusterViewRef, + schema_config_provider: &'a SchemaConfigProviderRef, ) -> Result { let default_catalog = instance.catalog_manager.default_catalog_name(); let default_schema = instance.catalog_manager.default_schema_name(); @@ -190,7 +191,9 @@ impl<'a, Q> HandlerContext<'a, Q> { .context(ParseSchemaName)? .unwrap_or_else(|| default_schema.to_string()); - let schema_config = cluster_view.schema_configs.get(&schema); + let schema_config = schema_config_provider + .schema_config(&schema) + .context(GetSchemaConfig)?; Ok(Self { header, @@ -217,15 +220,11 @@ impl<'a, Q> HandlerContext<'a, Q> { pub struct RpcServices { /// The grpc server rpc_server: Server, - /// Meta client - meta_client: Arc, } impl RpcServices { /// Start the rpc services pub async fn start(&mut self) -> Result<()> { - self.meta_client.start().await.context(StartMetaClient)?; - self.rpc_server.start(); for (host, port) in self.rpc_server.bind_addrs() { info!("Grpc server listening on {}:{}", host, port); @@ -242,11 +241,11 @@ impl RpcServices { pub struct Builder { bind_addr: String, port: u16, - meta_client_config: Option, env: Option>, runtimes: Option>, instance: Option>, - route_rules: RuleList, + router: Option, + schema_config_provider: Option, } impl Builder { @@ -254,11 +253,11 @@ impl Builder { Self { bind_addr: String::from("0.0.0.0"), port: 38081, - meta_client_config: None, env: None, runtimes: None, instance: None, - route_rules: RuleList::default(), + router: None, + schema_config_provider: None, } } @@ -272,11 +271,6 @@ impl Builder { self } - pub fn meta_client_config(mut self, config: MetaClientConfig) -> Self { - self.meta_client_config = Some(config); - self - } - pub fn env(mut self, env: Arc) -> Self { self.env = Some(env); self @@ -292,30 +286,31 @@ impl Builder { self } - pub fn route_rules(mut self, route_rules: RuleList) -> Self { - self.route_rules = route_rules; + pub fn router(mut self, router: RouterRef) -> Self { + self.router = Some(router); + self + } + + pub fn schema_config_provider(mut self, provider: SchemaConfigProviderRef) -> Self { + self.schema_config_provider = Some(provider); self } } impl Builder { pub fn build(self) -> Result { - let meta_client_config = self.meta_client_config.context(MissingMetaClientConfig)?; let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; + let router = self.router.context(MissingRouter)?; + let schema_config_provider = self + .schema_config_provider + .context(MissingSchemaConfigProvider)?; - let watcher = Box::new(SchemaWatcher { - catalog_manager: instance.catalog_manager.clone(), - }); - - let meta_client = meta_client::build_meta_client(meta_client_config, Some(watcher)) - .context(BuildMetaClient)?; - let router = Arc::new(RuleBasedRouter::new(meta_client.clone(), self.route_rules)); let storage_service = StorageServiceImpl { router, instance, runtimes, - meta_client: meta_client.clone(), + schema_config_provider, }; let rpc_service = storage_grpc::create_storage_service(storage_service); @@ -327,41 +322,7 @@ impl Builder { .build() .context(BuildRpcServer)?; - Ok(RpcServices { - rpc_server, - meta_client, - }) - } -} - -struct SchemaWatcher { - catalog_manager: ManagerRef, -} - -#[async_trait] -impl MetaWatcher for SchemaWatcher { - async fn on_change(&self, view: ClusterViewRef) -> meta_client::Result<()> { - for schema in view.schema_shards.keys() { - let default_catalog = catalogConst::DEFAULT_CATALOG; - if let Some(catalog) = self - .catalog_manager - .catalog_by_name(default_catalog) - .map_err(|e| Box::new(e) as _) - .context(FailGetCatalog { - catalog: default_catalog, - })? - { - catalog - .create_schema(schema) - .await - .map_err(|e| Box::new(e) as _) - .context(FailOnChangeView { - schema, - catalog: default_catalog, - })?; - } - } - Ok(()) + Ok(RpcServices { rpc_server }) } } @@ -384,7 +345,7 @@ struct StorageServiceImpl { router: Arc, instance: InstanceRef, runtimes: Arc, - meta_client: Arc, + schema_config_provider: SchemaConfigProviderRef, } impl Clone for StorageServiceImpl { @@ -393,7 +354,7 @@ impl Clone for StorageServiceImpl { router: self.router.clone(), instance: self.instance.clone(), runtimes: self.runtimes.clone(), - meta_client: self.meta_client.clone(), + schema_config_provider: self.schema_config_provider.clone(), } } } @@ -416,16 +377,17 @@ macro_rules! handle_request { _ => &self.runtimes.bg_runtime, }; - let cluster_view = self.meta_client.get_cluster_view(); + let schema_config_provider = self.schema_config_provider.clone(); // we need to pass the result via channel runtime.spawn( async move { - let handler_ctx = HandlerContext::new(header, router, instance, &cluster_view) - .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { - code: StatusCode::InvalidArgument, - msg: "Invalid header", - })?; + let handler_ctx = + HandlerContext::new(header, router, instance, &schema_config_provider) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::InvalidArgument, + msg: "Invalid header", + })?; $mod_name::$handle_fn(&handler_ctx, req).await.map_err(|e| { error!( "Failed to handle request, mod:{}, handler:{}, err:{}", @@ -514,11 +476,11 @@ impl StorageService for StorageServiceImpl { let router = self.router.clone(); let header = RequestHeader::from(ctx.request_headers()); let instance = self.instance.clone(); - let cluster_view = self.meta_client.get_cluster_view(); + let schema_config_provider = self.schema_config_provider.clone(); let (tx, rx) = oneshot::channel(); self.runtimes.write_runtime.spawn(async move { - let handler_ctx = HandlerContext::new(header, router, instance, &cluster_view) + let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider) .map_err(|e| Box::new(e) as _) .context(ErrWithCause { code: StatusCode::InvalidArgument, @@ -610,10 +572,10 @@ impl StorageService for StorageServiceImpl { let router = self.router.clone(); let header = RequestHeader::from(ctx.request_headers()); let instance = self.instance.clone(); - let cluster_view = self.meta_client.get_cluster_view(); + let schema_config_provider = self.schema_config_provider.clone(); let (tx, mut rx) = tokio::sync::mpsc::channel(STREAM_QUERY_CHANNEL_LEN); self.runtimes.read_runtime.spawn(async move { - let handler_ctx = HandlerContext::new(header, router, instance, &cluster_view) + let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider) .map_err(|e| Box::new(e) as _) .context(ErrWithCause { code: StatusCode::InvalidArgument, @@ -887,8 +849,8 @@ mod tests { use ceresdbproto_deps::ceresdbproto::storage::{ Field, FieldGroup, Tag, Value, WriteEntry, WriteMetric, }; + use cluster::SchemaConfig; use common_types::datum::DatumKind; - use meta_client::SchemaConfig; use super::*; diff --git a/server/src/grpc/route.rs b/server/src/grpc/route.rs index 2d4aedc0d0..0d03cfe743 100644 --- a/server/src/grpc/route.rs +++ b/server/src/grpc/route.rs @@ -9,7 +9,7 @@ use ceresdbproto_deps::ceresdbproto::storage::{RouteRequest, RouteResponse}; use crate::{ error::Result, grpc::{self, HandlerContext}, - router::Router, + route::Router, }; pub async fn handle_route( diff --git a/server/src/lib.rs b/server/src/lib.rs index fdd355a7db..464c5d3c8e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -21,6 +21,7 @@ pub mod limiter; pub mod logger; mod metrics; mod mysql; -mod router; +pub mod route; +pub mod schema_config_provider; pub mod server; pub mod table_engine; diff --git a/server/src/mysql/writer.rs b/server/src/mysql/writer.rs index 361f615f6e..dc7fd02375 100644 --- a/server/src/mysql/writer.rs +++ b/server/src/mysql/writer.rs @@ -89,7 +89,7 @@ impl<'a, W: std::io::Write> MysqlQueryResultWriter<'a, W> { } fn make_column_by_field(column: &ResponseColumn) -> Column { - let column_type = conver_datum_kind_type(&column.data_type); + let column_type = convert_datum_kind_type(&column.data_type); Column { table: "".to_string(), column: column.name.clone(), @@ -98,7 +98,7 @@ fn make_column_by_field(column: &ResponseColumn) -> Column { } } -fn conver_datum_kind_type(data_type: &DatumKind) -> ColumnType { +fn convert_datum_kind_type(data_type: &DatumKind) -> ColumnType { match data_type { DatumKind::Timestamp => ColumnType::MYSQL_TYPE_LONG, DatumKind::Double => ColumnType::MYSQL_TYPE_DOUBLE, diff --git a/server/src/route/cluster_based.rs b/server/src/route/cluster_based.rs new file mode 100644 index 0000000000..e488c2284d --- /dev/null +++ b/server/src/route/cluster_based.rs @@ -0,0 +1,26 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! A router based on the [`cluster::Cluster`]. + +#![allow(dead_code)] + +use ceresdbproto_deps::ceresdbproto::storage::{Route, RouteRequest}; +use cluster::ClusterRef; + +use crate::{error::Result, route::Router}; + +pub struct ClusterBasedRouter { + cluster: ClusterRef, +} + +impl ClusterBasedRouter { + pub fn new(cluster: ClusterRef) -> Self { + Self { cluster } + } +} + +impl Router for ClusterBasedRouter { + fn route(&self, _schema: &str, _req: RouteRequest) -> Result> { + todo!(); + } +} diff --git a/server/src/route/mod.rs b/server/src/route/mod.rs new file mode 100644 index 0000000000..424e4d662b --- /dev/null +++ b/server/src/route/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use ceresdbproto_deps::ceresdbproto::storage::{Route, RouteRequest}; + +use crate::error::Result; + +pub mod cluster_based; +pub mod rule_based; + +pub use cluster_based::ClusterBasedRouter; +pub use rule_based::{RuleBasedRouter, RuleList}; + +pub type RouterRef = Arc; + +pub trait Router { + fn route(&self, schema: &str, req: RouteRequest) -> Result>; +} diff --git a/server/src/router.rs b/server/src/route/rule_based.rs similarity index 82% rename from server/src/router.rs rename to server/src/route/rule_based.rs index 4bd39a9abf..0df51d191f 100644 --- a/server/src/router.rs +++ b/server/src/route/rule_based.rs @@ -1,27 +1,34 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +//! A router based on rules. + use std::{ collections::HashMap, hash::{Hash, Hasher}, - sync::Arc, }; use ceresdbproto_deps::ceresdbproto::storage::{Endpoint, Route, RouteRequest}; +use cluster::{Node, SchemaConfig}; use log::info; -use meta_client::{MetaClient, ShardId}; +use meta_client::types::ShardId; use serde_derive::Deserialize; use twox_hash::XxHash64; -use crate::error::{ErrNoCause, Result, StatusCode}; +use crate::{ + error::{ErrNoCause, Result, StatusCode}, + route::Router, +}; /// Hash seed to build hasher. Modify the seed will result in different route /// result! const HASH_SEED: u64 = 0; -pub type RouterRef = Arc; +pub type ShardNodes = HashMap; -pub trait Router { - fn route(&self, schema: &str, req: RouteRequest) -> Result>; +#[derive(Clone, Debug, Default)] +pub struct ClusterView { + pub schema_shards: HashMap, + pub schema_configs: HashMap, } #[derive(Clone, Debug, Deserialize)] @@ -43,6 +50,7 @@ pub struct HashRule { } #[derive(Clone, Debug, Default, Deserialize)] +#[serde(default)] pub struct RuleList { pub prefix_rules: Vec, pub hash_rules: Vec, @@ -82,19 +90,22 @@ impl RuleList { type SchemaRules = HashMap; pub struct RuleBasedRouter { - meta_client: Arc, + cluster_view: ClusterView, schema_rules: SchemaRules, } impl RuleBasedRouter { - pub fn new(meta_client: Arc, rules: RuleList) -> Self { + pub fn new(cluster_view: ClusterView, rules: RuleList) -> Self { let schema_rules = rules.split_by_schema(); - info!("RuleBasedRouter init with rules, rules:{:?}", schema_rules); + info!( + "RuleBasedRouter init with rules, rules:{:?}, cluster_view:{:?}", + schema_rules, cluster_view + ); Self { - meta_client, schema_rules, + cluster_view, } } @@ -140,12 +151,11 @@ impl RuleBasedRouter { impl Router for RuleBasedRouter { fn route(&self, schema: &str, req: RouteRequest) -> Result> { - let cluster_view = self.meta_client.get_cluster_view(); - if let Some(shard_view_map) = cluster_view.schema_shards.get(schema) { - if shard_view_map.is_empty() { + if let Some(shard_nodes) = self.cluster_view.schema_shards.get(schema) { + if shard_nodes.is_empty() { return ErrNoCause { code: StatusCode::NotFound, - msg: "shards from meta is empty", + msg: "No valid shard is found", } .fail(); } @@ -154,7 +164,7 @@ impl Router for RuleBasedRouter { let rule_list_opt = self.schema_rules.get(schema); // TODO(yingwen): Better way to get total shard number - let total_shards = shard_view_map.len(); + let total_shards = shard_nodes.len(); let mut route_vec = Vec::with_capacity(req.metrics.len()); for metric in req.metrics { let mut route = Route::new(); @@ -163,10 +173,9 @@ impl Router for RuleBasedRouter { let shard_id = Self::route_metric(route.get_metric(), rule_list_opt, total_shards); let mut endpoint = Endpoint::new(); - if let Some(shard_view) = shard_view_map.get(&shard_id) { - let node = &shard_view.node; + if let Some(node) = shard_nodes.get(&shard_id) { endpoint.set_ip(node.addr.clone()); - endpoint.set_port(node.port); + endpoint.set_port(node.port as u32); } else { return ErrNoCause { code: StatusCode::NotFound, diff --git a/server/src/schema_config_provider/cluster_based.rs b/server/src/schema_config_provider/cluster_based.rs new file mode 100644 index 0000000000..440b8ceb7c --- /dev/null +++ b/server/src/schema_config_provider/cluster_based.rs @@ -0,0 +1,25 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Schema provider based on cluster. + +#![allow(dead_code)] + +use cluster::ClusterRef; + +use crate::schema_config_provider::{Result, SchemaConfigProvider}; + +pub struct ClusterBasedProvider { + cluster: ClusterRef, +} + +impl ClusterBasedProvider { + pub fn new(cluster: ClusterRef) -> Self { + Self { cluster } + } +} + +impl SchemaConfigProvider for ClusterBasedProvider { + fn schema_config(&self, _schema_name: &str) -> Result> { + todo!() + } +} diff --git a/server/src/schema_config_provider/config_based.rs b/server/src/schema_config_provider/config_based.rs new file mode 100644 index 0000000000..e92760064c --- /dev/null +++ b/server/src/schema_config_provider/config_based.rs @@ -0,0 +1,29 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +// The schema config provider based on configs. + +use std::collections::HashMap; + +use cluster::SchemaConfig; + +use crate::schema_config_provider::{Result, SchemaConfigProvider}; + +pub type SchemaConfigs = HashMap; + +/// Provide schema config according to the given config. +#[derive(Debug)] +pub struct ConfigBasedProvider { + schema_configs: SchemaConfigs, +} + +impl ConfigBasedProvider { + pub fn new(schema_configs: SchemaConfigs) -> Self { + Self { schema_configs } + } +} + +impl SchemaConfigProvider for ConfigBasedProvider { + fn schema_config(&self, schema_name: &str) -> Result> { + Ok(self.schema_configs.get(schema_name)) + } +} diff --git a/server/src/schema_config_provider/mod.rs b/server/src/schema_config_provider/mod.rs new file mode 100644 index 0000000000..aa403d73d8 --- /dev/null +++ b/server/src/schema_config_provider/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Schema configuration can be retrieved from the the [`SchemaConfigProvider`]. + +use std::sync::Arc; + +use cluster::SchemaConfig; +use snafu::Snafu; + +pub mod cluster_based; +pub mod config_based; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error {} + +define_result!(Error); + +pub type SchemaConfigProviderRef = Arc; + +pub trait SchemaConfigProvider { + fn schema_config(&self, schema_name: &str) -> Result>; +} diff --git a/server/src/server.rs b/server/src/server.rs index 8acc8d70cc..3b6dc4846b 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -21,6 +21,8 @@ use crate::{ limiter::Limiter, mysql, mysql::error::Error as MysqlError, + route::RouterRef, + schema_config_provider::SchemaConfigProviderRef, }; #[derive(Debug, Snafu)] @@ -28,6 +30,12 @@ pub enum Error { #[snafu(display("Missing runtimes.\nBacktrace:\n{}", backtrace))] MissingRuntimes { backtrace: Backtrace }, + #[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))] + MissingRouter { backtrace: Backtrace }, + + #[snafu(display("Missing schema config provider.\nBacktrace:\n{}", backtrace))] + MissingSchemaConfigProvider { backtrace: Backtrace }, + #[snafu(display("Missing catalog manager.\nBacktrace:\n{}", backtrace))] MissingCatalogManager { backtrace: Backtrace }, @@ -61,7 +69,7 @@ pub enum Error { #[snafu(display("Failed to start grpc service, err:{}", source))] StartGrpcService { source: crate::grpc::Error }, - #[snafu(display("Failed to start clsuter, err:{}", source))] + #[snafu(display("Failed to start cluster, err:{}", source))] StartCluster { source: cluster::Error }, } @@ -108,15 +116,15 @@ impl Server { let catalog_mgr = &self.instance.catalog_manager; let default_catalog = catalog_mgr .catalog_by_name(catalog_mgr.default_catalog_name()) - .expect("Fail to retreive default catalog") + .expect("Fail to retrieve default catalog") .expect("Default catalog doesn't exist"); if default_catalog .schema_by_name(catalog_mgr.default_schema_name()) - .expect("Fail to retreive default schema") + .expect("Fail to retrieve default schema") .is_none() { - warn!("Deafult schema doesn't exist and create it"); + warn!("Default schema doesn't exist and create it"); default_catalog .create_schema(catalog_mgr.default_schema_name()) .await @@ -135,6 +143,8 @@ pub struct Builder { function_registry: Option, limiter: Limiter, cluster: Option, + router: Option, + schema_config_provider: Option, } impl Builder { @@ -148,6 +158,8 @@ impl Builder { function_registry: None, limiter: Limiter::default(), cluster: None, + router: None, + schema_config_provider: None, } } @@ -186,11 +198,21 @@ impl Builder { self } + pub fn router(mut self, router: RouterRef) -> Self { + self.router = Some(router); + self + } + + pub fn schema_config_provider( + mut self, + schema_config_provider: SchemaConfigProviderRef, + ) -> Self { + self.schema_config_provider = Some(schema_config_provider); + self + } + /// Build and run the server pub fn build(self) -> Result> { - // Build runtimes - let runtimes = self.runtimes.context(MissingRuntimes)?; - // Build instance let catalog_manager = self.catalog_manager.context(MissingCatalogManager)?; let query_executor = self.query_executor.context(MissingQueryExecutor)?; @@ -212,6 +234,7 @@ impl Builder { }; // Start http service + let runtimes = self.runtimes.context(MissingRuntimes)?; let http_service = http::Builder::new(http_config) .runtimes(runtimes.clone()) .instance(instance.clone()) @@ -229,16 +252,19 @@ impl Builder { .build() .context(BuildMysqlService)?; - let meta_client_config = self.config.meta_client; + let router = self.router.context(MissingRouter)?; + let provider = self + .schema_config_provider + .context(MissingSchemaConfigProvider)?; let env = Arc::new(Environment::new(self.config.grpc_server_cq_count)); let rpc_services = grpc::Builder::new() .bind_addr(self.config.bind_addr) .port(self.config.grpc_port) - .meta_client_config(meta_client_config) .env(env) .runtimes(runtimes) .instance(instance.clone()) - .route_rules(self.config.route_rules) + .router(router) + .schema_config_provider(provider) .build() .context(BuildGrpcService)?; diff --git a/src/adapter.rs b/src/adapter.rs index b13cceb3cb..a7b73a1399 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -6,7 +6,7 @@ use catalog::{ use catalog_impls::{SchemaIdAlloc, TableIdAlloc}; use cluster::TableManipulator; use log::debug; -use meta_client_v2::{types::DropTableRequest, MetaClientRef}; +use meta_client::{types::DropTableRequest, MetaClientRef}; use table_engine::{ engine::TableEngineRef, table::{SchemaId, TableId}, @@ -17,7 +17,7 @@ pub struct SchemaIdAllocAdapter(pub MetaClientRef); #[async_trait] impl SchemaIdAlloc for SchemaIdAllocAdapter { - type Error = meta_client_v2::Error; + type Error = meta_client::Error; async fn alloc_schema_id<'a>(&self, schema_name: NameRef<'a>) -> Result { self.0 @@ -32,7 +32,7 @@ pub struct TableIdAllocAdapter(pub MetaClientRef); #[async_trait] impl TableIdAlloc for TableIdAllocAdapter { - type Error = meta_client_v2::Error; + type Error = meta_client::Error; async fn alloc_table_id<'a>( &self, diff --git a/src/bin/ceresdb-server.rs b/src/bin/ceresdb-server.rs index e99e80c33e..1669530423 100644 --- a/src/bin/ceresdb-server.rs +++ b/src/bin/ceresdb-server.rs @@ -2,9 +2,7 @@ //! The main entry point to start the server -// TODO(yingwen): ceresdb-server is a legacy name, maybe use a new name - -use std::env; +use std::{env, net::SocketAddr}; use ceresdb::setup; use clap::{App, Arg}; @@ -28,6 +26,12 @@ fn fetch_version() -> String { ) } +// Parse the raw addr and panic if it is invalid. +fn parse_node_addr_or_fail(raw_addr: &str) -> (String, u16) { + let socket_addr: SocketAddr = raw_addr.parse().expect("invalid node addr"); + (socket_addr.ip().to_string(), socket_addr.port()) +} + fn main() { let version = fetch_version(); let matches = App::new("CeresDB Server") @@ -51,10 +55,12 @@ fn main() { }; if let Ok(node_addr) = env::var(NODE_ADDR) { - config.meta_client.node = node_addr; + let (ip, port) = parse_node_addr_or_fail(&node_addr); + config.cluster.node.addr = ip; + config.cluster.node.port = port; } if let Ok(cluster) = env::var(CLUSTER_NAME) { - config.meta_client.cluster = cluster; + config.cluster.meta_client.cluster_name = cluster; } // Setup log. diff --git a/src/setup.rs b/src/setup.rs index 6701c07eed..b708e80e62 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -14,10 +14,17 @@ use common_util::runtime; use df_operator::registry::FunctionRegistryImpl; use log::info; use logger::RuntimeLevel; -use meta_client_v2::meta_impl; +use meta_client::meta_impl; use query_engine::executor::{Executor, ExecutorImpl}; use server::{ config::{Config, DeployMode, RuntimeConfig}, + route::{ + cluster_based::ClusterBasedRouter, + rule_based::{ClusterView, RuleBasedRouter}, + }, + schema_config_provider::{ + cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider, + }, server::Builder, table_engine::{MemoryTableEngine, TableEngineProxy}, }; @@ -119,7 +126,9 @@ where .function_registry(function_registry); let builder = match config.deploy_mode { - DeployMode::Standalone => build_in_standalone_mode(builder, analytic, engine_proxy).await, + DeployMode::Standalone => { + build_in_standalone_mode(&config, builder, analytic, engine_proxy).await + } DeployMode::Cluster => { build_in_cluster_mode(&config, builder, &runtimes, engine_proxy).await } @@ -170,10 +179,17 @@ async fn build_in_cluster_mode( Arc::new(cluster_impl) }; - builder.catalog_manager(catalog_manager).cluster(cluster) + let router = Arc::new(ClusterBasedRouter::new(cluster.clone())); + let schema_config_provider = Arc::new(ClusterBasedProvider::new(cluster.clone())); + builder + .catalog_manager(catalog_manager) + .cluster(cluster) + .router(router) + .schema_config_provider(schema_config_provider) } async fn build_in_standalone_mode( + config: &Config, builder: Builder, table_engine: TableEngineRef, engine_proxy: TableEngineRef, @@ -184,5 +200,18 @@ async fn build_in_standalone_mode( // Create catalog manager, use analytic table as backend let catalog_manager = Arc::new(CatalogManagerImpl::new(Arc::new(table_based_manager))); - builder.catalog_manager(catalog_manager) + + // Build static router and schema config provider + let cluster_view = ClusterView::from(&config.static_route.topology); + let schema_configs = cluster_view.schema_configs.clone(); + let router = Arc::new(RuleBasedRouter::new( + cluster_view, + config.static_route.rule_list.clone(), + )); + let schema_config_provider = Arc::new(ConfigBasedProvider::new(schema_configs)); + + builder + .catalog_manager(catalog_manager) + .router(router) + .schema_config_provider(schema_config_provider) }