diff --git a/Cargo.lock b/Cargo.lock index 0fd7495092..7de80187c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -963,6 +963,7 @@ dependencies = [ "proto 0.1.0", "serde", "serde_derive", + "serde_json", "snafu 0.6.10", "sqlparser 0.19.0", ] @@ -989,6 +990,7 @@ dependencies = [ name = "common_util" version = "0.1.0" dependencies = [ + "arrow_deps 0.1.0", "backtrace", "chrono", "common_types 0.1.0", @@ -5137,6 +5139,7 @@ dependencies = [ "common_types 0.1.0", "common_util", "df_operator", + "hashbrown", "log", "paste 1.0.8", "regex", @@ -5162,6 +5165,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8ec7ef1bad82a2453dbaef7218b6f036e545edcce1ffd55f6e7af7bea43cce2" dependencies = [ "log", + "serde", ] [[package]] @@ -5950,8 +5954,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", - "rand 0.8.5", + "cfg-if 0.1.10", + "rand 0.3.23", "static_assertions 1.1.0", ] diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index d5b8795d04..589ced10ba 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -9,7 +9,7 @@ workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = "15.0.0" +arrow = { version = "15.0.0", features = ["prettyprint"]} parquet = "15.0.0" [dependencies.uncover] diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index 021994313a..c4083aecfe 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -23,6 +23,7 @@ paste = "1.0" proto = { path = "../proto" } snafu = { version ="0.6.10", features = ["backtraces"]} # TODO(yingwen): Make sqlparser support a feature -sqlparser = "0.19.0" +sqlparser = { version = "0.19.0", features = ["serde"]} serde = "1.0.81" serde_derive = "1.0.81" +serde_json = "1.0.60" diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 44908687bd..b24c7b2456 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -575,7 +575,6 @@ impl ColumnBlock { DatumKind::from_data_type(array.data_type()).with_context(|| UnsupportedArray { data_type: array.data_type().clone(), })?; - Self::try_from_arrow_array_ref(&datum_kind, array) } diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 29f4e2a1be..aa359fa39a 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -7,6 +7,7 @@ use std::{collections::BTreeMap, convert::TryFrom, str::FromStr}; use arrow_deps::arrow::datatypes::{DataType, Field}; use proto::common as common_pb; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use sqlparser::ast::Expr; use crate::datum::DatumKind; @@ -61,6 +62,15 @@ pub enum Error { source: Box, backtrace: Backtrace, }, + #[snafu(display( + "Can not deserialize default-value-option from pb data, err:{}.\nBacktrace:\n{}", + source, + backtrace + ))] + InvalidDefaultValueData { + source: serde_json::error::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -150,6 +160,8 @@ pub struct ColumnSchema { pub comment: String, /// Column name in response pub escaped_name: String, + /// Default value expr + pub default_value: Option, } impl ColumnSchema { @@ -187,6 +199,10 @@ impl ColumnSchema { column_schema.set_is_tag(self.is_tag); column_schema.set_comment(self.comment.clone()); + if let Some(default_value) = &self.default_value { + column_schema.set_default_value(serde_json::to_vec(default_value).unwrap()); + } + column_schema } @@ -250,10 +266,21 @@ impl ColumnSchema { } } -impl From for ColumnSchema { - fn from(column_schema: common_pb::ColumnSchema) -> Self { +impl TryFrom for ColumnSchema { + type Error = Error; + + fn try_from(column_schema: common_pb::ColumnSchema) -> Result { let escaped_name = column_schema.name.escape_debug().to_string(); - Self { + let default_value_bytes = column_schema.get_default_value(); + let default_value = if !default_value_bytes.is_empty() { + let expr = serde_json::from_slice::(default_value_bytes) + .context(InvalidDefaultValueData)?; + Some(expr) + } else { + None + }; + + Ok(Self { id: column_schema.id, name: column_schema.name, data_type: DatumKind::from(column_schema.data_type), @@ -261,7 +288,8 @@ impl From for ColumnSchema { is_tag: column_schema.is_tag, comment: column_schema.comment, escaped_name, - } + default_value, + }) } } @@ -290,6 +318,7 @@ impl TryFrom<&Field> for ColumnSchema { is_tag, comment, escaped_name: field.name().escape_debug().to_string(), + default_value: None, }) } } @@ -357,6 +386,7 @@ pub struct Builder { is_nullable: bool, is_tag: bool, comment: String, + default_value: Option, } impl Builder { @@ -369,6 +399,7 @@ impl Builder { is_nullable: false, is_tag: false, comment: String::new(), + default_value: None, } } @@ -394,6 +425,11 @@ impl Builder { self } + pub fn default_value(mut self, default_value: Option) -> Self { + self.default_value = default_value; + self + } + pub fn validate(&self) -> Result<()> { if self.is_tag { ensure!( @@ -418,6 +454,7 @@ impl Builder { is_tag: self.is_tag, comment: self.comment, escaped_name, + default_value: self.default_value, }) } } @@ -449,6 +486,7 @@ mod tests { is_tag: true, comment: "Comment of this column".to_string(), escaped_name: "test_column_schema".escape_debug().to_string(), + default_value: None, }; assert_eq!(&lhs, &rhs); @@ -461,7 +499,7 @@ mod tests { // Check pb specific fields assert!(pb_schema.is_tag); - let schema_from_pb = ColumnSchema::from(pb_schema); + let schema_from_pb = ColumnSchema::try_from(pb_schema).unwrap(); assert_eq!(&schema_from_pb, &column_schema); } diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index d3b51deacc..9eb40909f0 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -846,6 +846,43 @@ pub mod arrow_convert { Datum::Boolean(v) => Some(ScalarValue::Boolean(Some(*v))), } } + + pub fn from_scalar_value(val: &ScalarValue) -> Option { + match val { + ScalarValue::Boolean(v) => v.map(Datum::Boolean), + ScalarValue::Float32(v) => v.map(Datum::Float), + ScalarValue::Float64(v) => v.map(Datum::Double), + ScalarValue::Int8(v) => v.map(Datum::Int8), + ScalarValue::Int16(v) => v.map(Datum::Int16), + ScalarValue::Int32(v) => v.map(Datum::Int32), + ScalarValue::Int64(v) => v.map(Datum::Int64), + ScalarValue::UInt8(v) => v.map(Datum::UInt8), + ScalarValue::UInt16(v) => v.map(Datum::UInt16), + ScalarValue::UInt32(v) => v.map(Datum::UInt32), + ScalarValue::UInt64(v) => v.map(Datum::UInt64), + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v + .as_ref() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v + .as_ref() + .map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))), + ScalarValue::TimestampMillisecond(v, _) => { + v.map(|v| Datum::Timestamp(Timestamp::new(v))) + } + ScalarValue::List(_, _) + | ScalarValue::Date32(_) + | ScalarValue::Date64(_) + | ScalarValue::TimestampSecond(_, _) + | ScalarValue::TimestampMicrosecond(_, _) + | ScalarValue::TimestampNanosecond(_, _) + | ScalarValue::IntervalYearMonth(_) + | ScalarValue::IntervalDayTime(_) + | ScalarValue::Struct(_, _) + | ScalarValue::Decimal128(_, _, _) + | ScalarValue::Null + | ScalarValue::IntervalMonthDayNano(_) => None, + } + } } impl<'a> DatumView<'a> { diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 30e4ee24db..f86924d433 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -150,6 +150,9 @@ pub enum Error { key: ArrowSchemaMetaKey, backtrace: Backtrace, }, + + #[snafu(display("Arrow schema meta key not found.\nerr:\n{}", source))] + ColumnSchemaDeserializeFailed { source: crate::column_schema::Error }, } pub type CatalogName = String; @@ -825,7 +828,8 @@ impl TryFrom for Schema { .enable_tsid_primary_key(schema.enable_tsid_primary_key); for (i, column_schema_pb) in schema.columns.into_iter().enumerate() { - let column = ColumnSchema::from(column_schema_pb); + let column = + ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?; if i < schema.num_key_columns as usize { builder = builder.add_key_column(column)?; diff --git a/common_types/src/tests.rs b/common_types/src/tests.rs index e20313ce1c..8c8aa5cf48 100644 --- a/common_types/src/tests.rs +++ b/common_types/src/tests.rs @@ -1,6 +1,7 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use bytes::Bytes; +use sqlparser::ast::{BinaryOperator, Expr, Value}; use crate::{ column_schema, @@ -46,12 +47,67 @@ fn base_schema_builder() -> schema::Builder { .unwrap() } +fn default_value_schema_builder() -> schema::Builder { + schema::Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("key2".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + // The data type of column and its default value will not be the same in most time. + // So we need check if the type coercion is legal and do type coercion when legal. + // In he following, the data type of column is `Int64`, and the type of default value + // expr is `Int64`. So we use this column to cover the test, which has the same type. + column_schema::Builder::new("field1".to_string(), DatumKind::Int64) + .default_value(Some(Expr::Value(Value::Number("10".to_string(), false)))) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + // The data type of column is `UInt32`, and the type of default value expr is `Int64`. + // So we use this column to cover the test, which has different type. + column_schema::Builder::new("field2".to_string(), DatumKind::UInt32) + .default_value(Some(Expr::Value(Value::Number("20".to_string(), false)))) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field3".to_string(), DatumKind::UInt32) + .default_value(Some(Expr::BinaryOp { + left: Box::new(Expr::Value(Value::Number("1".to_string(), false))), + op: BinaryOperator::Plus, + right: Box::new(Expr::Value(Value::Number("2".to_string(), false))), + })) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() +} + /// Build a schema for testing: /// (key1(varbinary), key2(timestamp), field1(double), field2(string)) pub fn build_schema() -> Schema { base_schema_builder().build().unwrap() } +/// Build a schema for testing: +/// (key1(varbinary), key2(timestamp), field1(int64, default 10), +/// field2(uint32, default 20)), field3(uint32, default 1 + 2) +pub fn build_default_value_schema() -> Schema { + default_value_schema_builder().build().unwrap() +} + pub fn build_projected_schema() -> ProjectedSchema { let schema = build_schema(); assert!(schema.num_columns() > 1); diff --git a/common_util/Cargo.toml b/common_util/Cargo.toml index d4267ce9f4..00bfa5d359 100644 --- a/common_util/Cargo.toml +++ b/common_util/Cargo.toml @@ -13,6 +13,7 @@ test = ["env_logger"] [dependencies] # In alphabetical order +arrow_deps = { path = "../arrow_deps" } backtrace = "0.3.9" common_types = { path = "../common_types", features = ["test"] } chrono = "0.4" diff --git a/common_util/src/lib.rs b/common_util/src/lib.rs index f7c2c11e31..26ef18cae0 100644 --- a/common_util/src/lib.rs +++ b/common_util/src/lib.rs @@ -13,6 +13,7 @@ pub mod codec; pub mod config; pub mod metric; pub mod panic; +pub mod record_batch; pub mod runtime; pub mod time; pub mod toml; diff --git a/common_util/src/record_batch.rs b/common_util/src/record_batch.rs new file mode 100644 index 0000000000..43e590d2bf --- /dev/null +++ b/common_util/src/record_batch.rs @@ -0,0 +1,23 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use arrow_deps::arrow::util::pretty; +use common_types::record_batch::RecordBatch; + +/// A helper function to assert record batch. +pub fn assert_record_batches_eq(expected: &[&str], record_batches: Vec) { + let arrow_record_batch = record_batches + .into_iter() + .map(|record| record.into_arrow_record_batch()) + .collect::>(); + + let expected_lines: Vec = expected.iter().map(|&s| s.into()).collect(); + let formatted = pretty::pretty_format_batches(arrow_record_batch.as_slice()) + .unwrap() + .to_string(); + let actual_lines: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); +} diff --git a/df_operator/src/lib.rs b/df_operator/src/lib.rs index 36d5f32fdf..6bc929b5aa 100644 --- a/df_operator/src/lib.rs +++ b/df_operator/src/lib.rs @@ -8,3 +8,4 @@ pub mod registry; pub mod scalar; pub mod udaf; pub mod udfs; +pub mod visitor; diff --git a/df_operator/src/visitor.rs b/df_operator/src/visitor.rs new file mode 100644 index 0000000000..7e102acd7f --- /dev/null +++ b/df_operator/src/visitor.rs @@ -0,0 +1,35 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Helper function and struct to find input columns for an Expr; + +use arrow_deps::{ + datafusion::common::Result, + datafusion_expr::{ + expr::Expr as DfLogicalExpr, + expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, + }, +}; + +#[derive(Default)] +struct ColumnCollector { + /// columns used by the given expr + columns: Vec, +} + +impl ExpressionVisitor for ColumnCollector { + fn pre_visit(mut self, expr: &DfLogicalExpr) -> Result> + where + Self: ExpressionVisitor, + { + if let DfLogicalExpr::Column(column) = expr { + self.columns.push(column.name.clone()) + } + Ok(Recursion::Continue(self)) + } +} + +pub fn find_columns_by_expr(expr: &DfLogicalExpr) -> Vec { + let ColumnCollector { columns } = expr.accept(ColumnCollector::default()).unwrap(); + + columns +} diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index c2a2ddf636..79bb5ab1a9 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -2,8 +2,25 @@ //! Interpreter for insert statement +use std::{collections::HashMap, ops::IndexMut, sync::Arc}; + +use arrow_deps::{ + arrow::{datatypes::Schema as ArrowSchema, record_batch::RecordBatch}, + datafusion::{ + common::DFSchema, + error::DataFusionError, + logical_expr::ColumnarValue as DfColumnarValue, + optimizer::simplify_expressions::ConstEvaluator, + physical_expr::{ + create_physical_expr, execution_props::ExecutionProps, expressions::TryCastExpr, + }, + }, + datafusion_expr::{expr::Expr as DfLogicalExpr, expr_rewriter::ExprRewritable}, +}; use async_trait::async_trait; -use common_types::{column_schema::ColumnId, datum::Datum, hash::hash64}; +use common_types::{ + column::ColumnBlock, column_schema::ColumnId, datum::Datum, hash::hash64, row::RowGroup, +}; use common_util::codec::{compact::MemCompactEncoder, Encoder}; use snafu::{ResultExt, Snafu}; use sql::plan::InsertPlan; @@ -11,11 +28,23 @@ use table_engine::table::WriteRequest; use crate::{ context::Context, - interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result}, + interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, }; #[derive(Debug, Snafu)] pub enum Error { + #[snafu(display("Failed to generate datafusion expr, err:{}", source))] + DataFusionExpr { source: DataFusionError }, + + #[snafu(display( + "Failed to get data type from datafusion physical expr, err:{}", + source + ))] + DataFusionDataType { source: DataFusionError }, + + #[snafu(display("Failed to evaluate datafusion physical expr, err:{}", source))] + DataFusionExecutor { source: DataFusionError }, + #[snafu(display("Failed to write table, err:{}", source))] WriteTable { source: table_engine::table::Error }, @@ -23,8 +52,13 @@ pub enum Error { EncodeTsid { source: common_util::codec::compact::Error, }, + + #[snafu(display("Failed to convert arrow Array to ColumnBlock, err:{}", source))] + ConvertColumnBlock { source: common_types::column::Error }, } +define_result!(Error); + pub struct InsertInterpreter { ctx: Context, plan: InsertPlan, @@ -38,10 +72,17 @@ impl InsertInterpreter { #[async_trait] impl Interpreter for InsertInterpreter { - async fn execute(mut self: Box) -> Result { + async fn execute(mut self: Box) -> InterpreterResult { // Generate tsid if needed. - self.maybe_generate_tsid()?; - let InsertPlan { table, rows } = self.plan; + self.maybe_generate_tsid().context(Insert)?; + let InsertPlan { + table, + mut rows, + default_value_map, + } = self.plan; + + // Fill default values + fill_default_values(&mut rows, &default_value_map).context(Insert)?; // Context is unused now let _ctx = self.ctx; @@ -122,13 +163,11 @@ impl<'a> TsidBuilder<'a> { // Write column id first. self.encoder .encode(self.hash_bytes, &Datum::UInt64(u64::from(column_id))) - .context(EncodeTsid) - .context(Insert)?; + .context(EncodeTsid)?; // Write datum. self.encoder .encode(self.hash_bytes, datum) - .context(EncodeTsid) - .context(Insert)?; + .context(EncodeTsid)?; Ok(()) } @@ -136,3 +175,79 @@ impl<'a> TsidBuilder<'a> { hash64(self.hash_bytes) } } + +fn fill_default_values( + rows: &mut RowGroup, + default_value_map: &HashMap, +) -> Result<()> { + let input_df_schema = DFSchema::empty(); + let input_arrow_schema = Arc::new(ArrowSchema::empty()); + let input_batch = RecordBatch::new_empty(input_arrow_schema.clone()); + for (column_idx, default_value_expr) in default_value_map.iter() { + // Optimize logical expr + let execution_props = ExecutionProps::default(); + let mut const_optimizer = ConstEvaluator::new(&execution_props); + let evaluated_expr = default_value_expr + .clone() + .rewrite(&mut const_optimizer) + .context(DataFusionExpr)?; + + // Create physical expr + let execution_props = ExecutionProps::default(); + let physical_expr = create_physical_expr( + &evaluated_expr, + &input_df_schema, + &input_arrow_schema, + &execution_props, + ) + .context(DataFusionExpr)?; + + let from_type = physical_expr + .data_type(&input_arrow_schema) + .context(DataFusionDataType)?; + let to_type = rows.schema().column(*column_idx).data_type; + + let casted_physical_expr = if from_type != to_type.into() { + Arc::new(TryCastExpr::new(physical_expr, to_type.into())) + } else { + physical_expr + }; + + let output = casted_physical_expr + .evaluate(&input_batch) + .context(DataFusionExecutor)?; + + fill_column_to_row_group(*column_idx, &output, rows)?; + } + + Ok(()) +} + +fn fill_column_to_row_group( + column_idx: usize, + column: &DfColumnarValue, + rows: &mut RowGroup, +) -> Result<()> { + match column { + DfColumnarValue::Array(array) => { + for row_idx in 0..rows.num_rows() { + let datum_kind = rows.schema().column(column_idx).data_type; + let column_block = ColumnBlock::try_from_arrow_array_ref(&datum_kind, array) + .context(ConvertColumnBlock)?; + let datum = column_block.datum(row_idx); + rows.get_row_mut(row_idx) + .map(|row| std::mem::replace(row.index_mut(column_idx), datum.clone())); + } + } + DfColumnarValue::Scalar(scalar) => { + if let Some(datum) = Datum::from_scalar_value(scalar) { + for row_idx in 0..rows.num_rows() { + rows.get_row_mut(row_idx) + .map(|row| std::mem::replace(row.index_mut(column_idx), datum.clone())); + } + } + } + }; + + Ok(()) +} diff --git a/interpreters/src/interpreter.rs b/interpreters/src/interpreter.rs index 914dc190c5..ad4f8bdc1b 100644 --- a/interpreters/src/interpreter.rs +++ b/interpreters/src/interpreter.rs @@ -39,6 +39,9 @@ pub enum Error { #[snafu(display("Failed to execute exists, err:{}", source))] Exists { source: crate::exists::Error }, + + #[snafu(display("Failed to transfer ouput to records"))] + TryIntoRecords, } define_result!(Error); @@ -52,6 +55,18 @@ pub enum Output { Records(RecordBatchVec), } +impl TryFrom for RecordBatchVec { + type Error = Error; + + fn try_from(output: Output) -> Result { + if let Output::Records(records) = output { + Ok(records) + } else { + Err(Error::TryIntoRecords) + } + } +} + /// Interpreter executes the plan it holds #[async_trait] pub trait Interpreter { diff --git a/interpreters/src/show_create.rs b/interpreters/src/show_create.rs index be1c2a8487..675309aab2 100644 --- a/interpreters/src/show_create.rs +++ b/interpreters/src/show_create.rs @@ -86,6 +86,10 @@ impl ShowCreateInterpreter { res += " NOT NULL"; } + if let Some(expr) = &col.default_value { + res += format!(" DEFAULT {}", expr).as_str(); + } + if !col.comment.is_empty() { res += format!(" COMMENT '{}'", col.comment).as_str(); } diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index f2127b6e89..77a7c3b371 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -78,100 +78,156 @@ where ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::AffectedRows(v) = output { - assert_eq!(v, 0); - } else { - panic!(); - } + assert!( + matches!(output, Output::AffectedRows(v) if v == 0), + "create table should success" + ); } async fn test_desc_table(&self) { let sql = "desc table test_table"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::Records(v) = output { - assert_eq!(v.len(), 1); - } else { - panic!(); - } + let records = output.try_into().unwrap(); + let expected = vec![ + "+--------+-----------+------------+-------------+--------+", + "| name | type | is_primary | is_nullable | is_tag |", + "+--------+-----------+------------+-------------+--------+", + "| key1 | varbinary | true | false | false |", + "| key2 | timestamp | true | false | false |", + "| field1 | double | false | false | false |", + "| field2 | string | false | false | false |", + "+--------+-----------+------------+-------------+--------+", + ]; + common_util::record_batch::assert_record_batches_eq(&expected, records); } async fn test_exists_table(&self) { let sql = "exists table test_table"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::Records(v) = output { - assert_eq!(v.len(), 1); - } else { - panic!(); - } + let records = output.try_into().unwrap(); + let expected = vec![ + "+--------+", + "| result |", + "+--------+", + "| 1 |", + "+--------+", + ]; + common_util::record_batch::assert_record_batches_eq(&expected, records); } async fn test_insert_table(&self) { let sql = "INSERT INTO test_table(key1, key2, field1,field2) VALUES('tagk', 1638428434000,100, 'hello3'),('tagk2', 1638428434000,100, 'hello3');"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::AffectedRows(v) = output { - assert_eq!(v, 2); - } else { - panic!(); - } + assert!( + matches!(output, Output::AffectedRows(v) if v == 2), + "insert table should success" + ); + } + + async fn test_insert_table_with_missing_columns(&self) { + let catalog_manager = Arc::new(build_catalog_manager(self.engine()).await); + let ctx = Context::builder(RequestId::next_id()) + .default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string()) + .build(); + let insert_factory = + Factory::new(ExecutorImpl::new(), catalog_manager.clone(), self.engine()); + let insert_sql = "INSERT INTO test_missing_columns_table(key1, key2) VALUES('tagk', 1638428434000), ('tagk2', 1638428434000);"; + + let plan = sql_to_plan(&self.meta_provider, insert_sql); + let interpreter = insert_factory.create(ctx, plan); + let output = interpreter.execute().await.unwrap(); + assert!( + matches!(output, Output::AffectedRows(v) if v == 2), + "insert should success" + ); + + // Check data which just insert. + let select_sql = + "SELECT key1, key2, field1, field2, field3 from test_missing_columns_table"; + let select_factory = Factory::new(ExecutorImpl::new(), catalog_manager, self.engine()); + let ctx = Context::builder(RequestId::next_id()) + .default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string()) + .build(); + let plan = sql_to_plan(&self.meta_provider, select_sql); + let interpreter = select_factory.create(ctx, plan); + let output = interpreter.execute().await.unwrap(); + let records = output.try_into().unwrap(); + + let expected = vec![ + "+------------+---------------------+--------+--------+--------+", + "| key1 | key2 | field1 | field2 | field3 |", + "+------------+---------------------+--------+--------+--------+", + "| 7461676b | 2021-12-02 07:00:34 | 10 | 20 | 3 |", + "| 7461676b32 | 2021-12-02 07:00:34 | 10 | 20 | 3 |", + "+------------+---------------------+--------+--------+--------+", + ]; + common_util::record_batch::assert_record_batches_eq(&expected, records); } async fn test_select_table(&self) { let sql = "select * from test_table"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::Records(v) = output { - assert_eq!(v.len(), 1); - assert_eq!(v[0].num_rows(), 2); - } else { - panic!(); - } + let records = output.try_into().unwrap(); + let expected = vec![ + "+------------+---------------------+--------+--------+", + "| key1 | key2 | field1 | field2 |", + "+------------+---------------------+--------+--------+", + "| 7461676b | 2021-12-02 07:00:34 | 100 | hello3 |", + "| 7461676b32 | 2021-12-02 07:00:34 | 100 | hello3 |", + "+------------+---------------------+--------+--------+", + ]; + common_util::record_batch::assert_record_batches_eq(&expected, records); let sql = "select count(*) from test_table"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::Records(v) = output { - assert_eq!(v.len(), 1); - assert_eq!(v[0].num_rows(), 1); - } else { - panic!(); - } + let records = output.try_into().unwrap(); + let expected = vec![ + "+-----------------+", + "| COUNT(UInt8(1)) |", + "+-----------------+", + "| 2 |", + "+-----------------+", + ]; + common_util::record_batch::assert_record_batches_eq(&expected, records); } async fn test_show_create_table(&self) { let sql = "show create table test_table"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::Records(v) = output { - assert_eq!(v.len(), 1); - assert_eq!(v[0].num_rows(), 1); - } else { - panic!(); - } + let records = output.try_into().unwrap(); + let expected = vec![ + "+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| Table | Create Table |", + "+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| test_table | CREATE TABLE `test_table` (`key1` varbinary NOT NULL, `key2` timestamp NOT NULL, `field1` double NOT NULL, `field2` string NOT NULL, PRIMARY KEY(key1,key2), TIMESTAMP KEY(key2)) ENGINE=Analytic |", + "+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+" + ]; + common_util::record_batch::assert_record_batches_eq(&expected, records); } async fn test_alter_table(&self) { let sql = "alter table test_table add column add_col string"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::AffectedRows(v) = output { - assert_eq!(v, 0); - } else { - panic!(); - } + assert!( + matches!(output, Output::AffectedRows(v) if v == 0), + "alter table should success" + ); let sql = "alter table test_table modify SETTING ttl='9d'"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::AffectedRows(v) = output { - assert_eq!(v, 0); - } else { - panic!(); - } + assert!( + matches!(output, Output::AffectedRows(v) if v == 0), + "alter table should success" + ); } async fn test_drop_table(&self) { let sql = "drop table test_table"; let output = self.sql_to_output(sql).await.unwrap(); - if let Output::AffectedRows(v) = output { - assert_eq!(v, 0); - } else { - panic!(); - } + assert!( + matches!(output, Output::AffectedRows(v) if v == 0), + "alter table should success" + ); } } @@ -201,4 +257,6 @@ where env.test_show_create_table().await; env.test_alter_table().await; env.test_drop_table().await; + + env.test_insert_table_with_missing_columns().await; } diff --git a/proto/protos/common.proto b/proto/protos/common.proto index dc917685a7..9ea121b18f 100644 --- a/proto/protos/common.proto +++ b/proto/protos/common.proto @@ -38,6 +38,8 @@ message ColumnSchema { bool is_tag = 5; // Comment of the column string comment = 6; + // Default value expr of the column + optional bytes default_value = 7; } // Table Schema diff --git a/server/src/grpc/write.rs b/server/src/grpc/write.rs index 76d05ed3db..f4d6ecb16e 100644 --- a/server/src/grpc/write.rs +++ b/server/src/grpc/write.rs @@ -262,6 +262,7 @@ fn write_metric_to_insert_plan( Ok(InsertPlan { table, rows: row_group, + default_value_map: HashMap::new(), }) } @@ -505,6 +506,7 @@ mod test { is_tag: false, comment: String::new(), escaped_name: TIMESTAMP_COLUMN_NAME.escape_debug().to_string(), + default_value: None, }) .unwrap() .add_key_column(ColumnSchema { @@ -515,6 +517,7 @@ mod test { is_tag: true, comment: String::new(), escaped_name: TAG_K.escape_debug().to_string(), + default_value: None, }) .unwrap() .add_normal_column(ColumnSchema { @@ -525,6 +528,7 @@ mod test { is_tag: true, comment: String::new(), escaped_name: TAG_K1.escape_debug().to_string(), + default_value: None, }) .unwrap() .add_normal_column(ColumnSchema { @@ -535,6 +539,7 @@ mod test { is_tag: false, comment: String::new(), escaped_name: FIELD_NAME.escape_debug().to_string(), + default_value: None, }) .unwrap() .add_normal_column(ColumnSchema { @@ -545,6 +550,7 @@ mod test { is_tag: false, comment: String::new(), escaped_name: FIELD_NAME1.escape_debug().to_string(), + default_value: None, }) .unwrap() .build() diff --git a/server/src/http.rs b/server/src/http.rs index 6f4a908d41..35d2224ec1 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -36,7 +36,7 @@ pub enum Error { #[snafu(display("Failed to handle request, err:{}", source))] HandleRequest { - source: crate::handlers::error::Error, + source: Box, }, #[snafu(display("Missing runtimes to build service.\nBacktrace:\n{}", backtrace))] @@ -130,7 +130,7 @@ impl Service { .map_err(|e| { // TODO(yingwen): Maybe truncate and print the sql error!("Http service Failed to handle sql, err:{}", e); - e + Box::new(e) }) .context(HandleRequest); match result { @@ -282,7 +282,7 @@ impl Service { .await .map_err(|e| { error!("Http service failed to handle admin reject, err:{}", e); - e + Box::new(e) }) .context(HandleRequest); diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 79e63721f2..8a77d670f3 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -19,6 +19,7 @@ ceresdbproto_deps = { path = "../ceresdbproto_deps" } common_types = { path = "../common_types"} common_util = { path = "../common_util" } df_operator = { path = "../df_operator" } +hashbrown = { version = "0.12", features = ["raw"] } log = "0.4" paste = "1.0" snafu = { version ="0.6.10", features = ["backtraces"]} diff --git a/sql/src/parser.rs b/sql/src/parser.rs index 101a172a57..c964bbe935 100644 --- a/sql/src/parser.rs +++ b/sql/src/parser.rs @@ -7,7 +7,7 @@ use log::debug; use paste::paste; use sqlparser::{ - ast::{ColumnDef, ColumnOption, ColumnOptionDef, Ident, TableConstraint}, + ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, TableConstraint}, dialect::{keywords::Keyword, Dialect, MySqlDialect}, parser::{IsOptional::Mandatory, Parser as SqlParser, ParserError}, tokenizer::{Token, Tokenizer}, @@ -67,6 +67,16 @@ pub fn get_column_comment(opt: &ColumnOption) -> Option { None } +/// Get the default value expr from [`ColumnOption`] if it is a default-value +/// option. +pub fn get_default_value(opt: &ColumnOption) -> Option { + if let ColumnOption::Default(expr) = opt { + return Some(expr.clone()); + } + + None +} + /// Returns true when is a TIMESTAMP KEY table constraint pub fn is_timestamp_key_constraint(constrait: &TableConstraint) -> bool { if let TableConstraint::Unique { diff --git a/sql/src/plan.rs b/sql/src/plan.rs index ef9fec2921..54aedea713 100644 --- a/sql/src/plan.rs +++ b/sql/src/plan.rs @@ -9,7 +9,10 @@ use std::{ sync::Arc, }; -use arrow_deps::datafusion::logical_plan::LogicalPlan as DataFusionLogicalPlan; +use arrow_deps::{ + datafusion::logical_plan::LogicalPlan as DataFusionLogicalPlan, + datafusion_expr::expr::Expr as DfLogicalExpr, +}; use common_types::{column_schema::ColumnSchema, row::RowGroup, schema::Schema}; use common_util::define_result; use snafu::Snafu; @@ -121,6 +124,9 @@ pub struct InsertPlan { pub table: TableRef, /// RowGroup to insert pub rows: RowGroup, + /// Column indexes in schema to its default-value-expr which is used to fill + /// values + pub default_value_map: HashMap, } #[derive(Debug)] diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 187ab87bea..aaff7f3fb4 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -9,7 +9,20 @@ use std::{ sync::Arc, }; -use arrow_deps::datafusion::{error::DataFusionError, sql::planner::SqlToRel}; +use arrow_deps::{ + arrow::{ + compute::can_cast_types, + datatypes::{DataType as ArrowDataType, Schema as ArrowSchema}, + }, + datafusion::{ + common::{DFField, DFSchema}, + error::DataFusionError, + optimizer::simplify_expressions::ConstEvaluator, + physical_expr::{create_physical_expr, execution_props::ExecutionProps}, + sql::planner::SqlToRel, + }, + datafusion_expr::expr_rewriter::ExprRewritable, +}; use common_types::{ column_schema::{self, ColumnSchema}, datum::{Datum, DatumKind}, @@ -17,6 +30,8 @@ use common_types::{ row::{RowGroup, RowGroupBuilder}, schema::{self, Schema, TSID_COLUMN}, }; +use df_operator::visitor::find_columns_by_expr; +use hashbrown::HashMap as NoStdHashMap; use log::debug; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{ @@ -45,9 +60,21 @@ use crate::{ // should be easy to find out the reason. #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("DataFusion Failed to plan, err:{}", source))] + #[snafu(display("Failed to generate datafusion plan, err:{}", source))] DataFusionPlan { source: DataFusionError }, + #[snafu(display("Failed to create datafusion schema, err:{}", source))] + DataFusionSchema { source: DataFusionError }, + + #[snafu(display("Failed to generate datafusion expr, err:{}", source))] + DataFusionExpr { source: DataFusionError }, + + #[snafu(display( + "Failed to get data type from datafusion physical expr, err:{}", + source + ))] + DataFusionDataType { source: DataFusionError }, + // Statement is too large and complicate to carry in Error, so we // only return error here, so the caller should attach sql to its // error context @@ -57,6 +84,9 @@ pub enum Error { #[snafu(display("Create table name is empty"))] CreateTableNameEmpty, + #[snafu(display("Only support non-column-input expr in default-value-option, column name:{} default value:{}", name, default_value))] + CreateWithComplexDefaultValue { name: String, default_value: Expr }, + #[snafu(display("Table must contain timestamp constraint"))] RequireTimestamp, @@ -162,6 +192,18 @@ pub enum Error { #[snafu(display("Failed to build plan from promql, error:{}", source))] BuildPromPlanError { source: crate::promql::Error }, + + #[snafu(display( + "Failed to cast default value expr to column type, expr:{}, from:{}, to:{}", + expr, + from, + to + ))] + InvalidDefaultValueCoercion { + expr: Expr, + from: ArrowDataType, + to: ArrowDataType, + }, } define_result!(Error); @@ -296,6 +338,9 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { .map(|col| Ok((col.name.value.as_str(), parse_column(col)?))) .collect::>>()?; + // analyze default value options + analyze_column_default_value_options(&name_column_map, &self.meta_provider)?; + // Tsid column is a reserved column. ensure!( !name_column_map.contains_key(TSID_COLUMN), @@ -447,8 +492,26 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { validate_insert_stmt(table.name(), &schema, &column_names_idx)?; + let df_fields = schema + .columns() + .iter() + .map(|column_schema| { + DFField::new( + None, + &column_schema.name, + column_schema.data_type.to_arrow_data_type(), + column_schema.is_nullable, + ) + }) + .collect::>(); + let df_schema = DFSchema::new_with_metadata(df_fields, HashMap::new()) + .context(DataFusionSchema)?; + let df_planner = SqlToRel::new(&self.meta_provider); + // Index in insert values stmt of each column in table schema let mut column_index_in_insert = Vec::with_capacity(schema.num_columns()); + // Column index in schema to its default-value-expr + let mut default_value_map = HashMap::new(); // Check all not null columns are provided in stmt, also init // `column_index_in_insert` @@ -467,10 +530,17 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { } None => { // This column in schema is not in insert stmt - if column.is_nullable { + if let Some(expr) = &column.default_value { + let expr = df_planner + .sql_to_rex(expr.clone(), &df_schema, &mut NoStdHashMap::new()) + .context(DataFusionExpr)?; + + default_value_map.insert(idx, expr); + column_index_in_insert.push(InsertMode::Auto); + } else if column.is_nullable { column_index_in_insert.push(InsertMode::Null); } else { - // Column is not null and input does not contains that column + // Column can not be null and input does not contains that column return InsertMissingColumn { table: table.name(), column: &column.name, @@ -483,7 +553,11 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { let rows = build_row_group(schema, source, column_index_in_insert)?; - Ok(Plan::Insert(InsertPlan { table, rows })) + Ok(Plan::Insert(InsertPlan { + table, + rows, + default_value_map, + })) } // We already known this stmt is a INSERT stmt _ => unreachable!(), @@ -729,6 +803,7 @@ fn parse_column(col: &ColumnDef) -> Result { let mut is_tag = false; let mut is_unsign = false; let mut comment = String::new(); + let mut default_value = None; for option_def in &col.options { if matches!(option_def.option, ColumnOption::NotNull) { is_nullable = false; @@ -736,6 +811,8 @@ fn parse_column(col: &ColumnDef) -> Result { is_tag = true; } else if parser::is_unsign_column(&option_def.option) { is_unsign = true; + } else if let Some(default_value_expr) = parser::get_default_value(&option_def.option) { + default_value = Some(default_value_expr); } else if let Some(v) = parser::get_column_comment(&option_def.option) { comment = v; } @@ -750,13 +827,79 @@ fn parse_column(col: &ColumnDef) -> Result { let builder = column_schema::Builder::new(col.name.value.clone(), data_type) .is_nullable(is_nullable) .is_tag(is_tag) - .comment(comment); + .comment(comment) + .default_value(default_value); builder.build().context(InvalidColumnSchema { column_name: &col.name.value, }) } +// Analyze default value exprs. +fn analyze_column_default_value_options<'a, P: MetaProvider>( + name_column_map: &BTreeMap<&str, ColumnSchema>, + meta_provider: &ContextProviderAdapter<'a, P>, +) -> Result<()> { + let df_planner = SqlToRel::new(meta_provider); + let df_fields = name_column_map + .iter() + .map(|(name, column_def)| { + DFField::new( + None, + name, + column_def.data_type.to_arrow_data_type(), + column_def.is_nullable, + ) + }) + .collect::>(); + let df_schema = + DFSchema::new_with_metadata(df_fields, HashMap::new()).context(DataFusionSchema)?; + for column_def in name_column_map.values() { + if let Some(expr) = &column_def.default_value { + let df_logical_expr = df_planner + .sql_to_rex(expr.clone(), &df_schema, &mut NoStdHashMap::new()) + .context(DataFusionExpr)?; + + // Check input columns for the expr. Currently only support expr without input. + // Tracking issue: https://github.com/CeresDB/ceresdb/issues/252. + ensure!( + find_columns_by_expr(&df_logical_expr).is_empty(), + CreateWithComplexDefaultValue { + name: column_def.name.clone(), + default_value: expr.clone(), + } + ); + // Optimize expr + let execution_props = ExecutionProps::default(); + let mut const_optimizer = ConstEvaluator::new(&execution_props); + let evaluated_expr = df_logical_expr + .rewrite(&mut const_optimizer) + .context(DataFusionExpr)?; + + // Check if the return type of expr can cast to target type + let physical_expr = create_physical_expr( + &evaluated_expr, + &DFSchema::empty(), + &ArrowSchema::empty(), + &execution_props, + ) + .context(DataFusionExpr)?; + let from_type = physical_expr + .data_type(&ArrowSchema::empty()) + .context(DataFusionDataType)?; + ensure! { + can_cast_types(&from_type, &column_def.data_type.into()), + InvalidDefaultValueCoercion::{ + expr: expr.clone(), + from: from_type, + to: column_def.data_type.into(), + }, + } + } + } + Ok(()) +} + #[cfg(test)] mod tests { use sqlparser::ast::{Ident, Value}; @@ -837,7 +980,10 @@ mod tests { #[test] fn test_create_statement_to_plan() { - let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag not null,ts timestamp not null, c3 string, timestamp key(ts),primary key(c1, ts)) \ + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag not null, + ts timestamp not null, + c3 string, c4 uint32 Default 0, + c5 uint32 Default 1+1, timestamp key(ts),primary key(c1, ts)) \ ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')"; quick_test( sql, @@ -861,6 +1007,7 @@ mod tests { is_tag: true, comment: "", escaped_name: "c1", + default_value: None, }, ColumnSchema { id: 2, @@ -870,6 +1017,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "ts", + default_value: None, }, ColumnSchema { id: 3, @@ -879,6 +1027,50 @@ mod tests { is_tag: false, comment: "", escaped_name: "c3", + default_value: None, + }, + ColumnSchema { + id: 4, + name: "c4", + data_type: UInt32, + is_nullable: true, + is_tag: false, + comment: "", + escaped_name: "c4", + default_value: Some( + Value( + Number( + "0", + false, + ), + ), + ), + }, + ColumnSchema { + id: 5, + name: "c5", + data_type: UInt32, + is_nullable: true, + is_tag: false, + comment: "", + escaped_name: "c5", + default_value: Some( + BinaryOp { + left: Value( + Number( + "1", + false, + ), + ), + op: Plus, + right: Value( + Number( + "1", + false, + ), + ), + }, + ), }, ], }, @@ -895,6 +1087,30 @@ mod tests { .unwrap(); } + #[test] + fn test_create_table_failed() { + // Currently only support non-input expr as column default value. + // TODO(ygf11): remove this test after we support complex + // default-value-expr. + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag not null, + ts timestamp not null, + c3 uint32 Default 0, + c4 uint32 Default c3, timestamp key(ts),primary key(c1, ts)) \ + ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')"; + assert!(quick_test(sql, "").is_err()); + + // We need cast the data type of default-value-expr to the column data type + // when default-value-expr is present, so planner will check if this cast is + // allowed. + // bool -> timestamp is not allowed in Arrow. + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag not null, + ts timestamp not null, + c3 timestamp Default 1 > 2, + timestamp key(ts),primary key(c1, ts)) \ + ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')"; + assert!(quick_test(sql, "").is_err()); + } + #[test] fn test_query_statement_to_plan() { let sql = "select * from test_tablex;"; @@ -911,10 +1127,10 @@ mod tests { #[test] fn test_insert_statement_to_plan() { - let sql = "INSERT INTO test_tablex(key1, key2, field1,field2) VALUES('tagk', 1638428434000,100, 'hello3');"; + let sql = "INSERT INTO test_tablex(key1, key2, field1, field2) VALUES('tagk', 1638428434000, 100, 'hello3');"; assert!(quick_test(sql, "").is_err()); - let sql = "INSERT INTO test_table(key1, key2, field1,field2) VALUES('tagk', 1638428434000,100, 'hello3');"; + let sql = "INSERT INTO test_table(key1, key2, field1, field2) VALUES('tagk', 1638428434000, 100, 'hello3');"; quick_test( sql, r#"Insert( @@ -939,6 +1155,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key1", + default_value: None, }, ColumnSchema { id: 2, @@ -948,6 +1165,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key2", + default_value: None, }, ColumnSchema { id: 3, @@ -957,6 +1175,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field1", + default_value: None, }, ColumnSchema { id: 4, @@ -966,6 +1185,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field2", + default_value: None, }, ], }, @@ -988,6 +1208,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key1", + default_value: None, }, ColumnSchema { id: 2, @@ -997,6 +1218,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key2", + default_value: None, }, ColumnSchema { id: 3, @@ -1006,6 +1228,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field1", + default_value: None, }, ColumnSchema { id: 4, @@ -1015,6 +1238,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field2", + default_value: None, }, ], }, @@ -1049,6 +1273,7 @@ mod tests { 1638428434000, ), }, + default_value_map: {}, }, )"#, ) @@ -1117,6 +1342,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key1", + default_value: None, }, ColumnSchema { id: 2, @@ -1126,6 +1352,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key2", + default_value: None, }, ColumnSchema { id: 3, @@ -1135,6 +1362,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field1", + default_value: None, }, ColumnSchema { id: 4, @@ -1144,6 +1372,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field2", + default_value: None, }, ], }, @@ -1186,6 +1415,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key1", + default_value: None, }, ColumnSchema { id: 2, @@ -1195,6 +1425,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key2", + default_value: None, }, ColumnSchema { id: 3, @@ -1204,6 +1435,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field1", + default_value: None, }, ColumnSchema { id: 4, @@ -1213,6 +1445,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field2", + default_value: None, }, ], }, @@ -1229,6 +1462,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "add_col", + default_value: None, }, ], ), @@ -1268,6 +1502,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key1", + default_value: None, }, ColumnSchema { id: 2, @@ -1277,6 +1512,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key2", + default_value: None, }, ColumnSchema { id: 3, @@ -1286,6 +1522,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field1", + default_value: None, }, ColumnSchema { id: 4, @@ -1295,6 +1532,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field2", + default_value: None, }, ], }, @@ -1343,6 +1581,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key1", + default_value: None, }, ColumnSchema { id: 2, @@ -1352,6 +1591,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "key2", + default_value: None, }, ColumnSchema { id: 3, @@ -1361,6 +1601,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field1", + default_value: None, }, ColumnSchema { id: 4, @@ -1370,6 +1611,7 @@ mod tests { is_tag: false, comment: "", escaped_name: "field2", + default_value: None, }, ], }, diff --git a/sql/src/tests.rs b/sql/src/tests.rs index 3cb74b0c3a..4cc69b9010 100644 --- a/sql/src/tests.rs +++ b/sql/src/tests.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_deps::datafusion::catalog::TableReference; use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; -use common_types::tests::build_schema; +use common_types::tests::{build_default_value_schema, build_schema}; use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf}; use table_engine::{ memory::MemoryTable, @@ -34,6 +34,12 @@ impl Default for MockMetaProvider { build_schema(), ANALYTIC_ENGINE_TYPE.to_string(), )), + Arc::new(MemoryTable::new( + "test_missing_columns_table".to_string(), + TableId::from(102), + build_default_value_schema(), + ANALYTIC_ENGINE_TYPE.to_string(), + )), ], } } diff --git a/tests/cases/local/05_ddl/create_tables.result b/tests/cases/local/05_ddl/create_tables.result index 872f3650a8..fa1f5975d0 100644 --- a/tests/cases/local/05_ddl/create_tables.result +++ b/tests/cases/local/05_ddl/create_tables.result @@ -181,6 +181,20 @@ CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, query: CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown');. Caused by: Failed to execute create table, err:Failed to create table, name:05_create_tables_t8, err:Failed to create table, err:Invalid arguments, err:Invalid options, space_id:2, table:05_create_tables_t8, table_id:2199023255725, err:Unknown storage format. value:\"unknown\"." }) +CREATE TABLE `05_create_tables_t9`(c1 int, c2 bigint default 0, c3 uint32 default 1 + 1, c4 string default 'xxx', t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; + +affected_rows: 0 + +show create table `05_create_tables_t9`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t9")),String(StringBytes(b"CREATE TABLE `05_create_tables_t9` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, `c2` bigint DEFAULT 0, `c3` uint32 DEFAULT 1 + 1, `c4` string DEFAULT 'xxx', PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t9`; + +affected_rows: 0 + DROP TABLE IF EXISTS `05_create_tables_t`; affected_rows: 0 diff --git a/tests/cases/local/05_ddl/create_tables.sql b/tests/cases/local/05_ddl/create_tables.sql index 3d7082a672..8b8b21eee1 100644 --- a/tests/cases/local/05_ddl/create_tables.sql +++ b/tests/cases/local/05_ddl/create_tables.sql @@ -62,6 +62,10 @@ show create table `05_create_tables_t8`; drop table `05_create_tables_t8`; CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown'); +-- Default value options +CREATE TABLE `05_create_tables_t9`(c1 int, c2 bigint default 0, c3 uint32 default 1 + 1, c4 string default 'xxx', t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; +show create table `05_create_tables_t9`; +drop table `05_create_tables_t9`; DROP TABLE IF EXISTS `05_create_tables_t`; DROP TABLE IF EXISTS `05_create_tables_t2`; diff --git a/tests/cases/local/06_show/show_create_table.result b/tests/cases/local/06_show/show_create_table.result index e1770783b5..73c4f00bbb 100644 --- a/tests/cases/local/06_show/show_create_table.result +++ b/tests/cases/local/06_show/show_create_table.result @@ -17,7 +17,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_a`; Table,Create Table, -String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int DEFAULT 3, `c` string DEFAULT 'x', `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_b` (a bigint, b int null default null, c string, d smallint null, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -27,7 +27,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_b`; Table,Create Table, -String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int DEFAULT NULL, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_c` (a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/tests/cases/local/07_optimizer/optimizer.result b/tests/cases/local/07_optimizer/optimizer.result index bb8e19d61b..7eb028ea63 100644 --- a/tests/cases/local/07_optimizer/optimizer.result +++ b/tests/cases/local/07_optimizer/optimizer.result @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY plan_type,plan, String(StringBytes(b"logical_plan")),String(StringBytes(b"Projection: #MAX(07_optimizer_t.value) AS c1, #AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[#07_optimizer_t.name]], aggr=[[MAX(#07_optimizer_t.value), AVG(#07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=Some([name, value])")), -String(StringBytes(b"physical_plan")),String(StringBytes(b"ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 6)\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n")), +String(StringBytes(b"physical_plan")),String(StringBytes(b"ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 32)\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n RepartitionExec: partitioning=RoundRobinBatch(32)\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n")), DROP TABLE `07_optimizer_t`;