Skip to content

Commit

Permalink
refactor: remove SchemaIdAlloc and TableIdAlloc (apache#238)
Browse files Browse the repository at this point in the history
* tiny tidies

Signed-off-by: Ruihang Xia <[email protected]>

* remove to IdAlloc_s

Signed-off-by: Ruihang Xia <[email protected]>

* run cargo fmt

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Sep 7, 2022
1 parent 7e9d675 commit 61b7b74
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 340 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub enum Error {
define_result!(Error);

/// Catalog manage schemas
// TODO(yingwen): Maybe use async trait?
// TODO(yingwen): Provide a context
// TODO(yingwen): Catalog id?
#[async_trait]
Expand Down
2 changes: 2 additions & 0 deletions catalog_impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ workspace = true
# Workspace dependencies, in alphabetical order
async-trait = "0.1.53"
catalog = { path = "../catalog" }
cluster = { path = "../cluster" }
common_types = { path = "../common_types" }
common_util = { path = "../common_util" }
log = "0.4"
meta_client = { path = "../meta_client" }
snafu = { version ="0.6.10", features = ["backtraces"]}
system_catalog = { path = "../system_catalog" }
table_engine = { path = "../table_engine" }
Expand Down
28 changes: 0 additions & 28 deletions catalog_impls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

use std::sync::Arc;

use async_trait::async_trait;
use catalog::{
consts::SYSTEM_CATALOG,
manager::{Manager, ManagerRef},
schema::NameRef,
CatalogRef,
};
use system_catalog::{tables::Tables, SystemTableAdapter};
use table_engine::table::{SchemaId, TableId};

use crate::system_tables::{SystemTables, SystemTablesBuilder};

Expand Down Expand Up @@ -57,29 +55,3 @@ impl Manager for CatalogManagerImpl {
self.user_catalog_manager.all_catalogs()
}
}

#[async_trait]
pub trait SchemaIdAlloc: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn alloc_schema_id<'a>(
&self,
schema_name: NameRef<'a>,
) -> std::result::Result<SchemaId, Self::Error>;
}

#[async_trait]
pub trait TableIdAlloc: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn alloc_table_id<'a>(
&self,
schema_name: NameRef<'a>,
table_name: NameRef<'a>,
) -> std::result::Result<TableId, Self::Error>;

async fn invalidate_table_id<'a>(
&self,
schema_name: NameRef<'a>,
table_name: NameRef<'a>,
table_id: TableId,
) -> std::result::Result<(), Self::Error>;
}
94 changes: 42 additions & 52 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,22 @@ use catalog::{
};
use common_types::schema::SchemaName;
use log::{debug, info};
use meta_client::MetaClientRef;
use snafu::{ensure, ResultExt};
use table_engine::table::{SchemaId, TableRef};
use table_engine::table::{SchemaId, TableId, TableRef};
use tokio::sync::Mutex;

use crate::{SchemaIdAlloc, TableIdAlloc};

/// ManagerImpl manages multiple volatile catalogs.
pub struct ManagerImpl<S, T> {
catalogs: HashMap<String, Arc<CatalogImpl<S, T>>>,
schema_id_alloc: Arc<S>,
table_id_alloc: Arc<T>,
pub struct ManagerImpl {
catalogs: HashMap<String, Arc<CatalogImpl>>,
meta_client: MetaClientRef,
}

impl<S, T> ManagerImpl<S, T>
where
S: SchemaIdAlloc + 'static,
T: TableIdAlloc + 'static,
{
pub async fn new(schema_id_alloc: S, table_id_alloc: T) -> Self {
impl ManagerImpl {
pub async fn new(meta_client: MetaClientRef) -> Self {
let mut manager = ManagerImpl {
catalogs: HashMap::new(),
table_id_alloc: Arc::new(table_id_alloc),
schema_id_alloc: Arc::new(schema_id_alloc),
meta_client,
};

manager.maybe_create_default_catalog().await;
Expand All @@ -53,11 +46,7 @@ where
}
}

impl<S, T> Manager for ManagerImpl<S, T>
where
S: SchemaIdAlloc + 'static,
T: TableIdAlloc + 'static,
{
impl Manager for ManagerImpl {
fn default_catalog_name(&self) -> NameRef {
consts::DEFAULT_CATALOG
}
Expand All @@ -80,11 +69,7 @@ where
}
}

impl<S, T> ManagerImpl<S, T>
where
S: SchemaIdAlloc,
T: TableIdAlloc + 'static,
{
impl ManagerImpl {
async fn maybe_create_default_catalog(&mut self) {
// Try to get default catalog, create it if not exists.
if self.catalogs.get(consts::DEFAULT_CATALOG).is_none() {
Expand All @@ -94,12 +79,11 @@ where
};
}

async fn create_catalog(&mut self, catalog_name: String) -> Arc<CatalogImpl<S, T>> {
async fn create_catalog(&mut self, catalog_name: String) -> Arc<CatalogImpl> {
let catalog = Arc::new(CatalogImpl {
name: catalog_name.clone(),
schemas: RwLock::new(HashMap::new()),
schema_id_alloc: self.schema_id_alloc.clone(),
table_id_alloc: self.table_id_alloc.clone(),
meta_client: self.meta_client.clone(),
});

self.catalogs.insert(catalog_name, catalog.clone());
Expand All @@ -113,21 +97,16 @@ where
/// The schema and table id are allocated (and maybe stored) by other components
/// so there is no recovering work for all the schemas and tables during
/// initialization.
struct CatalogImpl<S, T> {
struct CatalogImpl {
/// Catalog name
name: String,
/// All the schemas belonging to the catalog.
schemas: RwLock<HashMap<SchemaName, SchemaRef>>,
schema_id_alloc: Arc<S>,
table_id_alloc: Arc<T>,
meta_client: MetaClientRef,
}

#[async_trait]
impl<S, T> Catalog for CatalogImpl<S, T>
where
S: SchemaIdAlloc,
T: TableIdAlloc + 'static,
{
impl Catalog for CatalogImpl {
fn name(&self) -> NameRef {
&self.name
}
Expand All @@ -147,14 +126,17 @@ where
}

let schema_id = self
.schema_id_alloc
.alloc_schema_id(name)
.meta_client
.alloc_schema_id(cluster::AllocSchemaIdRequest {
name: name.to_string(),
})
.await
.map_err(|e| Box::new(e) as _)
.context(catalog::CreateSchema {
catalog: &self.name,
schema: name,
})?;
})
.map(|resp| SchemaId::from(resp.id))?;

let mut schemas = self.schemas.write().unwrap();
if schemas.get(name).is_some() {
Expand All @@ -165,7 +147,7 @@ where
self.name.to_string(),
name.to_string(),
schema_id,
self.table_id_alloc.clone(),
self.meta_client.clone(),
));

schemas.insert(name.to_string(), schema);
Expand All @@ -191,7 +173,7 @@ where
///
/// The tables belonging to the schema won't be recovered during initialization
/// and will be opened afterwards.
struct SchemaImpl<T> {
struct SchemaImpl {
/// Catalog name
catalog_name: String,
/// Schema name
Expand All @@ -201,23 +183,23 @@ struct SchemaImpl<T> {
/// Guard for creating/dropping table
create_table_mutex: Mutex<()>,
schema_id: SchemaId,
table_id_alloc: Arc<T>,
meta_client: MetaClientRef,
}

impl<T> SchemaImpl<T> {
impl SchemaImpl {
fn new(
catalog_name: String,
schema_name: String,
schema_id: SchemaId,
table_id_alloc: Arc<T>,
meta_client: MetaClientRef,
) -> Self {
Self {
catalog_name,
schema_name,
tables: RwLock::new(HashMap::new()),
create_table_mutex: Mutex::new(()),
schema_id,
table_id_alloc,
meta_client,
}
}

Expand Down Expand Up @@ -265,7 +247,7 @@ impl<T> SchemaImpl<T> {
}

#[async_trait]
impl<T: TableIdAlloc> Schema for SchemaImpl<T> {
impl Schema for SchemaImpl {
fn name(&self) -> NameRef {
&self.schema_name
}
Expand Down Expand Up @@ -306,14 +288,18 @@ impl<T: TableIdAlloc> Schema for SchemaImpl<T> {
}

let table_id = self
.table_id_alloc
.alloc_table_id(&request.schema_name, &request.table_name)
.meta_client
.alloc_table_id(cluster::AllocTableIdRequest {
schema_name: request.schema_name.to_string(),
name: request.table_name.to_string(),
})
.await
.map_err(|e| Box::new(e) as _)
.context(schema::AllocateTableId {
schema: &self.schema_name,
table: &request.table_name,
})?;
})
.map(|v| TableId::from(v.id))?;

let request = request.into_engine_create_request(table_id);

Expand Down Expand Up @@ -367,9 +353,13 @@ impl<T: TableIdAlloc> Schema for SchemaImpl<T> {
.await
.context(DropTable)?;

// invalidate the table id after table is dropped in engine.
self.table_id_alloc
.invalidate_table_id(&schema_name, &table_name, table.id())
// Request CeresMeta to drop this table.
self.meta_client
.drop_table(cluster::DropTableRequest {
schema_name: schema_name.to_string(),
name: table_name.to_string(),
id: table.id().as_u64(),
})
.await
.map_err(|e| Box::new(e) as _)
.context(schema::InvalidateTableId {
Expand Down
14 changes: 7 additions & 7 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use meta_client::{
types::{
ActionCmd, GetNodesRequest, GetShardTablesRequest, RouteTablesRequest, RouteTablesResponse,
},
EventHandler, MetaClient,
EventHandler, MetaClientRef,
};
use snafu::{OptionExt, ResultExt};
use tokio::{
Expand All @@ -25,7 +25,7 @@ use crate::{
table_manager::{ShardTableInfo, TableManager},
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, StartMetaClient,
TableManipulator,
TableManipulator, TableManipulatorRef,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand All @@ -43,8 +43,8 @@ pub struct ClusterImpl {

impl ClusterImpl {
pub fn new(
meta_client: Arc<dyn MetaClient + Send + Sync>,
table_manipulator: Arc<dyn TableManipulator + Send + Sync>,
meta_client: MetaClientRef,
table_manipulator: TableManipulatorRef,
config: ClusterConfig,
runtime: Arc<Runtime>,
) -> Result<Self> {
Expand Down Expand Up @@ -102,8 +102,8 @@ impl ClusterImpl {

struct Inner {
table_manager: TableManager,
meta_client: Arc<dyn MetaClient + Send + Sync>,
table_manipulator: Arc<dyn TableManipulator + Send + Sync>,
meta_client: MetaClientRef,
table_manipulator: TableManipulatorRef,
#[allow(dead_code)]
topology: RwLock<ClusterTopology>,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl EventHandler for Inner {

impl Inner {
fn new(
meta_client: Arc<dyn MetaClient + Send + Sync>,
meta_client: MetaClientRef,
table_manipulator: Arc<dyn TableManipulator + Send + Sync>,
) -> Result<Self> {
Ok(Self {
Expand Down
2 changes: 2 additions & 0 deletions docs/guides/src/dev/crate-deps.dot
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ digraph G {
catalog_impls -> catalog
catalog_impls -> system_catalog
catalog_impls -> table_engine
catalog_impls -> cluster
catalog_impls -> meta_client

cluster -> analytic_engine
cluster -> catalog
Expand Down
Loading

0 comments on commit 61b7b74

Please sign in to comment.