Skip to content

Commit 803b7f0

Browse files
feat: implement "drop table" in distributed mode (both in SQL and gRPC) (#944)
* feat: implement "drop table" in distributed mode (both in SQL and gRPC) refactor: create distributed table some details: - set table global value in Meta, as well as table routes value. Datanode only set table regional value - complete instance SQL tests both in standalone and distributed mode * fix: rebase develop * fix: resolve PR comments
1 parent 37ca5ba commit 803b7f0

File tree

28 files changed

+762
-394
lines changed

28 files changed

+762
-394
lines changed

Cargo.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ arrow-flight = "29.0"
5252
arrow-schema = { version = "29.0", features = ["serde"] }
5353
async-stream = "0.3"
5454
async-trait = "0.1"
55+
chrono = { version = "0.4", features = ["serde"] }
5556
# TODO(LFC): Use released Datafusion when it officially dpendent on Arrow 29.0
5657
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
5758
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
@@ -65,6 +66,7 @@ parquet = "29.0"
6566
paste = "1.0"
6667
prost = "0.11"
6768
serde = { version = "1.0", features = ["derive"] }
69+
serde_json = "1.0"
6870
snafu = { version = "0.7", features = ["backtraces"] }
6971
sqlparser = "0.28"
7072
tokio = { version = "1.24.2", features = ["full"] }

src/api/greptime/v1/meta/route.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ message CreateRequest {
3636

3737
TableName table_name = 2;
3838
repeated Partition partitions = 3;
39+
bytes table_info = 4;
3940
}
4041

4142
message RouteRequest {

src/catalog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ table = { path = "../table" }
3333
tokio.workspace = true
3434

3535
[dev-dependencies]
36-
chrono = "0.4"
36+
chrono.workspace = true
3737
log-store = { path = "../log-store" }
3838
mito = { path = "../mito", features = ["test"] }
3939
object-store = { path = "../object-store" }

src/catalog/src/error.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ pub enum Error {
181181
source: BoxedError,
182182
},
183183

184+
#[snafu(display("{source}"))]
185+
Internal {
186+
#[snafu(backtrace)]
187+
source: BoxedError,
188+
},
189+
184190
#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
185191
SystemCatalogTableScanExec {
186192
#[snafu(backtrace)]
@@ -203,6 +209,12 @@ pub enum Error {
203209
#[snafu(backtrace)]
204210
source: datatypes::error::Error,
205211
},
212+
213+
#[snafu(display("Failed to serialize or deserialize catalog entry: {}", source))]
214+
CatalogEntrySerde {
215+
#[snafu(backtrace)]
216+
source: common_catalog::error::Error,
217+
},
206218
}
207219

208220
pub type Result<T> = std::result::Result<T, Error>;
@@ -224,7 +236,9 @@ impl ErrorExt for Error {
224236
Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
225237

226238
Error::ReadSystemCatalog { source, .. } => source.status_code(),
227-
Error::InvalidCatalogValue { source, .. } => source.status_code(),
239+
Error::InvalidCatalogValue { source, .. } | Error::CatalogEntrySerde { source } => {
240+
source.status_code()
241+
}
228242

229243
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
230244
Error::TableNotExist { .. } => StatusCode::TableNotFound,
@@ -240,9 +254,11 @@ impl ErrorExt for Error {
240254
Error::MetaSrv { source, .. } => source.status_code(),
241255
Error::SystemCatalogTableScan { source } => source.status_code(),
242256
Error::SystemCatalogTableScanExec { source } => source.status_code(),
243-
Error::InvalidTableSchema { source, .. } => source.status_code(),
244-
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
245-
Error::SchemaProviderOperation { source } => source.status_code(),
257+
Error::InvalidTableSchema { source, .. }
258+
| Error::InvalidTableInfoInCatalog { source } => source.status_code(),
259+
Error::SchemaProviderOperation { source } | Error::Internal { source } => {
260+
source.status_code()
261+
}
246262

247263
Error::Unimplemented { .. } => StatusCode::Unsupported,
248264
}

src/catalog/src/remote/manager.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,18 @@ impl CatalogManager for RemoteCatalogManager {
430430
Ok(true)
431431
}
432432

433-
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
434-
UnimplementedSnafu {
435-
operation: "deregister table",
436-
}
437-
.fail()
433+
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
434+
let catalog_name = &request.catalog;
435+
let schema_name = &request.schema;
436+
let schema = self
437+
.schema(catalog_name, schema_name)?
438+
.context(SchemaNotFoundSnafu {
439+
catalog: catalog_name,
440+
schema: schema_name,
441+
})?;
442+
443+
let result = schema.deregister_table(&request.table_name)?;
444+
Ok(result.is_none())
438445
}
439446

440447
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {

src/common/catalog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ serde_json = "1.0"
1616
snafu = { version = "0.7", features = ["backtraces"] }
1717

1818
[dev-dependencies]
19-
chrono = "0.4"
19+
chrono.workspace = true
2020
tempdir = "0.3"
2121
tokio.workspace = true

src/common/time/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition.workspace = true
55
license.workspace = true
66

77
[dependencies]
8-
chrono = "0.4"
8+
chrono.workspace = true
99
common-error = { path = "../error" }
1010
serde = { version = "1.0", features = ["derive"] }
1111
serde_json = "1.0"

src/frontend/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ api = { path = "../api" }
1010
async-stream.workspace = true
1111
async-trait = "0.1"
1212
catalog = { path = "../catalog" }
13-
chrono = "0.4"
13+
chrono.workspace = true
1414
client = { path = "../client" }
1515
common-base = { path = "../common/base" }
1616
common-catalog = { path = "../common/catalog" }

src/frontend/src/catalog.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::any::Any;
1616
use std::collections::HashSet;
1717
use std::sync::Arc;
1818

19-
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
19+
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu, Result as CatalogResult};
2020
use catalog::helper::{
2121
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
2222
TableGlobalKey, TableGlobalValue,
@@ -60,12 +60,10 @@ impl FrontendCatalogManager {
6060
self.backend.clone()
6161
}
6262

63-
#[cfg(test)]
6463
pub(crate) fn partition_manager(&self) -> PartitionRuleManagerRef {
6564
self.partition_manager.clone()
6665
}
6766

68-
#[cfg(test)]
6967
pub(crate) fn datanode_clients(&self) -> Arc<DatanodeClients> {
7068
self.datanode_clients.clone()
7169
}
@@ -79,15 +77,13 @@ impl CatalogManager for FrontendCatalogManager {
7977
Ok(())
8078
}
8179

82-
async fn register_table(&self, _request: RegisterTableRequest) -> catalog::error::Result<bool> {
83-
unimplemented!()
80+
// TODO(LFC): Handle the table caching in (de)register_table.
81+
async fn register_table(&self, _request: RegisterTableRequest) -> CatalogResult<bool> {
82+
Ok(true)
8483
}
8584

86-
async fn deregister_table(
87-
&self,
88-
_request: DeregisterTableRequest,
89-
) -> catalog::error::Result<bool> {
90-
unimplemented!()
85+
async fn deregister_table(&self, _request: DeregisterTableRequest) -> CatalogResult<bool> {
86+
Ok(true)
9187
}
9288

9389
async fn register_schema(
@@ -289,7 +285,7 @@ impl SchemaProvider for FrontendSchemaProvider {
289285
let partition_manager = self.partition_manager.clone();
290286
let datanode_clients = self.datanode_clients.clone();
291287
let table_name = TableName::new(&self.catalog_name, &self.schema_name, name);
292-
let result: Result<Option<TableRef>, catalog::error::Error> = std::thread::spawn(|| {
288+
let result: CatalogResult<Option<TableRef>> = std::thread::spawn(|| {
293289
common_runtime::block_on_read(async move {
294290
let res = match backend.get(table_global_key.to_string().as_bytes()).await? {
295291
None => {

src/frontend/src/error.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ pub enum Error {
181181
source: partition::error::Error,
182182
},
183183

184-
#[snafu(display("Failed to create AlterExpr from Alter statement, source: {}", source))]
185-
AlterExprFromStmt {
184+
#[snafu(display("Failed to create table info, source: {}", source))]
185+
CreateTableInfo {
186186
#[snafu(backtrace)]
187-
source: sql::error::Error,
187+
source: datatypes::error::Error,
188188
},
189189

190190
#[snafu(display("Failed to build CreateExpr on insertion: {}", source))]
@@ -226,17 +226,6 @@ pub enum Error {
226226
backtrace: Backtrace,
227227
},
228228

229-
#[snafu(display(
230-
"Failed to find leader peer for region {} in table {}",
231-
region,
232-
table_name
233-
))]
234-
FindLeaderPeer {
235-
region: u64,
236-
table_name: String,
237-
backtrace: Backtrace,
238-
},
239-
240229
#[snafu(display("Cannot find primary key column by name: {}", msg))]
241230
PrimaryKeyNotFound { msg: String, backtrace: Backtrace },
242231

@@ -370,13 +359,12 @@ impl ErrorExt for Error {
370359
Error::SqlExecIntercepted { source, .. } => source.status_code(),
371360
Error::StartServer { source, .. } => source.status_code(),
372361

373-
Error::ParseSql { source } | Error::AlterExprFromStmt { source } => {
374-
source.status_code()
375-
}
362+
Error::ParseSql { source } => source.status_code(),
376363

377364
Error::Table { source } => source.status_code(),
378365

379-
Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(),
366+
Error::ConvertColumnDefaultConstraint { source, .. }
367+
| Error::CreateTableInfo { source } => source.status_code(),
380368

381369
Error::RequestDatanode { source } => source.status_code(),
382370

@@ -387,7 +375,6 @@ impl ErrorExt for Error {
387375
Error::FindDatanode { .. }
388376
| Error::CreateTableRoute { .. }
389377
| Error::FindRegionRoute { .. }
390-
| Error::FindLeaderPeer { .. }
391378
| Error::BuildDfLogicalPlan { .. }
392379
| Error::BuildTableMeta { .. } => StatusCode::Internal,
393380

src/frontend/src/expr_factory.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,24 @@ use std::sync::Arc;
1717

1818
use api::helper::ColumnDataTypeWrapper;
1919
use api::v1::{Column, ColumnDataType, CreateTableExpr};
20+
use common_error::prelude::BoxedError;
21+
use datanode::instance::sql::table_idents_to_full_name;
2022
use datatypes::schema::ColumnSchema;
23+
use session::context::QueryContextRef;
2124
use snafu::{ensure, ResultExt};
2225
use sql::ast::{ColumnDef, TableConstraint};
26+
use sql::statements::column_def_to_schema;
2327
use sql::statements::create::{CreateTable, TIME_INDEX};
24-
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
2528

2629
use crate::error::{
27-
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
28-
InvalidSqlSnafu, ParseSqlSnafu, Result,
30+
self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu,
31+
ConvertColumnDefaultConstraintSnafu, InvalidSqlSnafu, ParseSqlSnafu, Result,
2932
};
3033

3134
pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
3235

3336
#[async_trait::async_trait]
3437
pub trait CreateExprFactory {
35-
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr>;
36-
3738
async fn create_expr_by_columns(
3839
&self,
3940
catalog_name: &str,
@@ -48,10 +49,6 @@ pub struct DefaultCreateExprFactory;
4849

4950
#[async_trait::async_trait]
5051
impl CreateExprFactory for DefaultCreateExprFactory {
51-
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr> {
52-
create_to_expr(None, vec![0], stmt)
53-
}
54-
5552
async fn create_expr_by_columns(
5653
&self,
5754
catalog_name: &str,
@@ -74,13 +71,14 @@ impl CreateExprFactory for DefaultCreateExprFactory {
7471
}
7572

7673
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
77-
fn create_to_expr(
78-
table_id: Option<u32>,
79-
region_ids: Vec<u32>,
74+
pub(crate) fn create_to_expr(
8075
create: &CreateTable,
76+
query_ctx: QueryContextRef,
8177
) -> Result<CreateTableExpr> {
8278
let (catalog_name, schema_name, table_name) =
83-
table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?;
79+
table_idents_to_full_name(&create.name, query_ctx)
80+
.map_err(BoxedError::new)
81+
.context(error::ExternalSnafu)?;
8482

8583
let time_index = find_time_index(&create.constraints)?;
8684
let expr = CreateTableExpr {
@@ -94,8 +92,8 @@ fn create_to_expr(
9492
create_if_not_exists: create.if_not_exists,
9593
// TODO(LFC): Fill in other table options.
9694
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
97-
table_id: table_id.map(|id| api::v1::TableId { id }),
98-
region_ids,
95+
table_id: None,
96+
region_ids: vec![],
9997
};
10098
Ok(expr)
10199
}

0 commit comments

Comments
 (0)