Skip to content

Commit

Permalink
refactor: use TableManager to implement CatalogManager and `SchemaM…
Browse files Browse the repository at this point in the history
…anager` (apache#260)

* refactor: use TableManager to implement CatalogManager and SchemaManager

Signed-off-by: Ruihang Xia <[email protected]>

* chore: style and doc changes

Signed-off-by: Ruihang Xia <[email protected]>

* chore: add license header

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Sep 19, 2022
1 parent 25557c7 commit aa6c4c4
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 77 deletions.
202 changes: 202 additions & 0 deletions catalog_impls/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use async_trait::async_trait;
use catalog::{
consts,
manager::Manager,
schema::{
self, CloseOptions, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions,
DropTableRequest, NameRef, OpenOptions, OpenTableRequest, Schema, SchemaRef,
},
Catalog, CatalogRef,
};
use cluster::{cluster_impl::ClusterImpl, table_manager::TableManager};
use table_engine::table::{SchemaId, TableRef};

/// Catalog/Schema manager in the cluster mode.
///
/// This just redirects requests to [ClusterImpl] (which implements
/// [cluster::Cluster]) for orphan rule.
pub struct ManagerImpl(ClusterImpl);

impl Manager for ManagerImpl {
fn default_catalog_name(&self) -> NameRef {
consts::DEFAULT_CATALOG
}

fn default_schema_name(&self) -> NameRef {
consts::DEFAULT_SCHEMA
}

fn catalog_by_name(&self, name: NameRef) -> catalog::manager::Result<Option<CatalogRef>> {
let catalog = self.0.table_manager().get_catalog_name(name).map(|name| {
Arc::new(CatalogImpl {
name,
table_manager: self.0.table_manager().clone(),
}) as _
});

Ok(catalog)
}

fn all_catalogs(&self) -> catalog::manager::Result<Vec<CatalogRef>> {
let catalogs = self
.0
.table_manager()
.get_all_catalog_names()
.into_iter()
.map(|name| {
Arc::new(CatalogImpl {
name,
table_manager: self.0.table_manager().clone(),
}) as _
})
.collect();

Ok(catalogs)
}
}

pub struct CatalogImpl {
/// Catalog name
name: String,
table_manager: TableManager,
}

#[async_trait]
impl Catalog for CatalogImpl {
/// Get the catalog name
fn name(&self) -> NameRef {
&self.name
}

/// Find schema by name
fn schema_by_name(&self, name: NameRef) -> catalog::Result<Option<SchemaRef>> {
let schema = self
.table_manager
.get_schema_id(&self.name, name)
.map(|id| {
Arc::new(SchemaImpl {
catalog_name: self.name.clone(),
schema_name: name.to_string(),
id: id.into(),
table_manager: self.table_manager.clone(),
}) as _
});

Ok(schema)
}

#[allow(unused_variables)]
async fn create_schema<'a>(&'a self, name: NameRef<'a>) -> catalog::Result<()> {
todo!()
}

/// All schemas
fn all_schemas(&self) -> catalog::Result<Vec<SchemaRef>> {
let schemas = self
.table_manager
.get_all_schema_infos(&self.name)
.into_iter()
.map(|info| {
Arc::new(SchemaImpl {
catalog_name: self.name.clone(),
schema_name: info.name,
id: info.id.into(),
table_manager: self.table_manager.clone(),
}) as _
})
.collect();

Ok(schemas)
}
}

pub struct SchemaImpl {
/// Catalog name
catalog_name: String,
/// Schema name
schema_name: String,
/// Schema id
id: SchemaId,
table_manager: TableManager,
}

#[async_trait]
impl Schema for SchemaImpl {
/// Get schema name.
fn name(&self) -> NameRef {
&self.schema_name
}

/// Get schema id
fn id(&self) -> SchemaId {
self.id
}

/// Find table by name.
fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>> {
let table = self
.table_manager
.table_by_name(&self.catalog_name, &self.schema_name, name);

Ok(table)
}

/// Create table according to `request`.
#[allow(unused_variables)]
async fn create_table(
&self,
request: CreateTableRequest,
opts: CreateOptions,
) -> schema::Result<TableRef> {
todo!()
}

/// Drop table according to `request`.
///
/// Returns true if the table is really dropped.
#[allow(unused_variables)]
async fn drop_table(
&self,
request: DropTableRequest,
opts: DropOptions,
) -> schema::Result<bool> {
todo!()
}

/// Open the table according to `request`.
///
/// Return None if table does not exist.
#[allow(unused_variables)]
async fn open_table(
&self,
request: OpenTableRequest,
opts: OpenOptions,
) -> schema::Result<Option<TableRef>> {
todo!()
}

/// Close the table according to `request`.
///
/// Return false if table does not exist.
#[allow(unused_variables)]
async fn close_table(
&self,
request: CloseTableRequest,
opts: CloseOptions,
) -> schema::Result<()> {
todo!()
}

/// All tables
fn all_tables(&self) -> schema::Result<Vec<TableRef>> {
let tables = self
.table_manager
.get_all_table_ref(&self.catalog_name, &self.schema_name);

Ok(tables)
}
}
1 change: 1 addition & 0 deletions catalog_impls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use system_catalog::{tables::Tables, SystemTableAdapter};

use crate::system_tables::{SystemTables, SystemTablesBuilder};

pub mod cluster;
mod system_tables;
pub mod table_based;
pub mod volatile;
Expand Down
24 changes: 9 additions & 15 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ use tokio::{
};

use crate::{
config::ClusterConfig,
table_manager::{ShardTableInfo, TableManager},
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, StartMetaClient,
config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, Cluster,
ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, StartMetaClient,
TableManipulator, TableManipulatorRef,
};

Expand Down Expand Up @@ -98,6 +96,10 @@ impl ClusterImpl {
fn error_wait_lease(&self) -> Duration {
self.config.meta_client.lease.0 / 2
}

pub fn table_manager(&self) -> &TableManager {
&self.inner.table_manager
}
}

struct Inner {
Expand Down Expand Up @@ -142,17 +144,9 @@ impl EventHandler for Inner {

Ok(())
}
ActionCmd::CreateTableCmd(cmd) => self
.table_manager
.add_shard_table(ShardTableInfo::from(cmd))
.map_err(|e| Box::new(e) as _),
ActionCmd::DropTableCmd(cmd) => {
warn!("Drop table, schema:{}, table:{}", cmd.schema_name, cmd.name);

self.table_manager.drop_table(&cmd.schema_name, &cmd.name);
Ok(())
}
ActionCmd::MetaNoneCmd(_)
ActionCmd::CreateTableCmd(_) => todo!(),
ActionCmd::DropTableCmd(_)
| ActionCmd::MetaNoneCmd(_)
| ActionCmd::MetaCloseCmd(_)
| ActionCmd::MetaSplitCmd(_)
| ActionCmd::MetaChangeRoleCmd(_) => {
Expand Down
2 changes: 1 addition & 1 deletion cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use snafu::{Backtrace, Snafu};

pub mod cluster_impl;
pub mod config;
mod table_manager;
pub mod table_manager;
// FIXME: Remove this lint ignore derive when topology about schema tables is
// finished.
#[allow(dead_code)]
Expand Down
Loading

0 comments on commit aa6c4c4

Please sign in to comment.