diff --git a/Cargo.lock b/Cargo.lock index 6e88493e928e..bb17ea942ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5325,7 +5325,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d75496d5d09dedcd0edcade57ccf0a522f4393ae#d75496d5d09dedcd0edcade57ccf0a522f4393ae" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=295be556573f7f90b6b004d046d3f2f127a5739f#295be556573f7f90b6b004d046d3f2f127a5739f" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 1185c9b2d573..da7bdc4943d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "295be556573f7f90b6b004d046d3f2f127a5739f" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index a091d997997b..db25acfd0ec6 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -603,6 +603,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::CreateView(_)) => "ddl.create_view", Some(Expr::DropView(_)) => "ddl.drop_view", Some(Expr::AlterDatabase(_)) => "ddl.alter_database", + Some(Expr::CommentOn(_)) => "ddl.comment_on", None => "ddl.empty", } } diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index e12331d4a26b..7af538f785f3 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -31,6 +31,7 @@ use crate::region_registry::LeaderRegionRegistryRef; pub mod alter_database; pub mod alter_logical_tables; pub mod alter_table; +pub mod comment_on; pub mod create_database; pub mod create_flow; pub mod create_logical_tables; diff --git a/src/common/meta/src/ddl/alter_table/executor.rs b/src/common/meta/src/ddl/alter_table/executor.rs index a5e843dd081f..5e44023f357d 100644 --- a/src/common/meta/src/ddl/alter_table/executor.rs +++ b/src/common/meta/src/ddl/alter_table/executor.rs @@ -301,8 +301,8 @@ fn build_new_table_info( | AlterKind::UnsetTableOptions { .. } | AlterKind::SetIndexes { .. } | AlterKind::UnsetIndexes { .. } - | AlterKind::DropDefaults { .. } => {} - AlterKind::SetDefaults { .. } => {} + | AlterKind::DropDefaults { .. } + | AlterKind::SetDefaults { .. } => {} } info!( diff --git a/src/common/meta/src/ddl/comment_on.rs b/src/common/meta/src/ddl/comment_on.rs new file mode 100644 index 000000000000..9dd46d6da9fe --- /dev/null +++ b/src/common/meta/src/ddl/comment_on.rs @@ -0,0 +1,509 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use chrono::Utc; +use common_catalog::format_full_table_name; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::tracing::info; +use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::TableId; +use strum::AsRefStr; +use table::metadata::RawTableInfo; +use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY; +use table::table_name::TableName; + +use crate::cache_invalidator::Context; +use crate::ddl::DdlContext; +use crate::ddl::utils::map_to_procedure_error; +use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu}; +use crate::instruction::CacheIdent; +use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue}; +use crate::key::table_info::{TableInfoKey, TableInfoValue}; +use crate::key::table_name::TableNameKey; +use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue}; +use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock}; +use crate::rpc::ddl::{CommentObjectType, CommentOnTask}; +use crate::rpc::store::PutRequest; + +pub struct CommentOnProcedure { + pub context: DdlContext, + pub data: CommentOnData, +} + +impl CommentOnProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn"; + + pub fn new(task: CommentOnTask, context: DdlContext) -> Self { + Self { + context, + data: CommentOnData::new(task), + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + + Ok(Self { context, data }) + } + + pub async fn on_prepare(&mut self) -> Result { + match self.data.object_type { + CommentObjectType::Table | CommentObjectType::Column => { + self.prepare_table_or_column().await?; + } + CommentObjectType::Flow => { + self.prepare_flow().await?; + } + } + + // Fast path: if comment is unchanged, skip update + if self.data.is_unchanged { + let object_desc = match self.data.object_type { + CommentObjectType::Table => format!( + "table {}", + format_full_table_name( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ) + ), + CommentObjectType::Column => format!( + "column {}.{}", + format_full_table_name( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ), + self.data.column_name.as_ref().unwrap() + ), + CommentObjectType::Flow => { + format!("flow {}.{}", self.data.catalog_name, self.data.object_name) + } + }; + info!("Comment unchanged for {}, skipping update", object_desc); + return Ok(Status::done()); + } + + self.data.state = CommentOnState::UpdateMetadata; + Ok(Status::executing(true)) + } + + async fn prepare_table_or_column(&mut self) -> Result<()> { + let table_name_key = TableNameKey::new( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ); + + let table_id = self + .context + .table_metadata_manager + .table_name_manager() + .get(table_name_key) + .await? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ), + })? + .table_id(); + + let table_info = self + .context + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ), + })?; + + // For column comments, validate the column exists + if self.data.object_type == CommentObjectType::Column { + let column_name = self.data.column_name.as_ref().unwrap(); + let column_exists = table_info + .table_info + .meta + .schema + .column_schemas + .iter() + .any(|col| &col.name == column_name); + + ensure!( + column_exists, + ColumnNotFoundSnafu { + column_name, + column_id: 0u32, // column_id is not known here + } + ); + } + + self.data.table_id = Some(table_id); + + // Check if comment is unchanged for early exit optimization + match self.data.object_type { + CommentObjectType::Table => { + let current_comment = &table_info.table_info.desc; + if &self.data.comment == current_comment { + self.data.is_unchanged = true; + } + } + CommentObjectType::Column => { + let column_name = self.data.column_name.as_ref().unwrap(); + let column_schema = table_info + .table_info + .meta + .schema + .column_schemas + .iter() + .find(|col| &col.name == column_name) + .unwrap(); // Safe: validated above + + let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY); + if self.data.comment.as_deref() == current_comment.map(String::as_str) { + self.data.is_unchanged = true; + } + } + CommentObjectType::Flow => { + // this branch is handled in `prepare_flow` + } + } + + self.data.table_info = Some(table_info); + + Ok(()) + } + + async fn prepare_flow(&mut self) -> Result<()> { + let flow_name_value = self + .context + .flow_metadata_manager + .flow_name_manager() + .get(&self.data.catalog_name, &self.data.object_name) + .await? + .with_context(|| FlowNotFoundSnafu { + flow_name: &self.data.object_name, + })?; + + let flow_id = flow_name_value.flow_id(); + let flow_info = self + .context + .flow_metadata_manager + .flow_info_manager() + .get_raw(flow_id) + .await? + .with_context(|| FlowNotFoundSnafu { + flow_name: &self.data.object_name, + })?; + + self.data.flow_id = Some(flow_id); + + // Check if comment is unchanged for early exit optimization + let current_comment = &flow_info.get_inner_ref().comment; + let new_comment = self.data.comment.as_deref().unwrap_or(""); + if new_comment == current_comment.as_str() { + self.data.is_unchanged = true; + } + + self.data.flow_info = Some(flow_info); + + Ok(()) + } + + pub async fn on_update_metadata(&mut self) -> Result { + match self.data.object_type { + CommentObjectType::Table => { + self.update_table_comment().await?; + } + CommentObjectType::Column => { + self.update_column_comment().await?; + } + CommentObjectType::Flow => { + self.update_flow_comment().await?; + } + } + + self.data.state = CommentOnState::InvalidateCache; + Ok(Status::executing(true)) + } + + async fn update_table_comment(&mut self) -> Result<()> { + let table_info_value = self.data.table_info.as_ref().unwrap(); + let mut new_table_info = table_info_value.table_info.clone(); + + new_table_info.desc = self.data.comment.clone(); + + // Sync comment to table options + sync_table_comment_option( + &mut new_table_info.meta.options, + new_table_info.desc.as_deref(), + ); + + self.update_table_info(table_info_value, new_table_info) + .await?; + + info!( + "Updated comment for table {}.{}.{}", + self.data.catalog_name, self.data.schema_name, self.data.object_name + ); + + Ok(()) + } + + async fn update_column_comment(&mut self) -> Result<()> { + let table_info_value = self.data.table_info.as_ref().unwrap(); + let mut new_table_info = table_info_value.table_info.clone(); + + let column_name = self.data.column_name.as_ref().unwrap(); + let column_schema = new_table_info + .meta + .schema + .column_schemas + .iter_mut() + .find(|col| &col.name == column_name) + .unwrap(); // Safe: validated in prepare + + update_column_comment_metadata(column_schema, self.data.comment.clone()); + + self.update_table_info(table_info_value, new_table_info) + .await?; + + info!( + "Updated comment for column {}.{}.{}.{}", + self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name + ); + + Ok(()) + } + + async fn update_flow_comment(&mut self) -> Result<()> { + let flow_id = self.data.flow_id.unwrap(); + let flow_info_value = self.data.flow_info.as_ref().unwrap(); + + let mut new_flow_info = flow_info_value.get_inner_ref().clone(); + new_flow_info.comment = self.data.comment.clone().unwrap_or_default(); + new_flow_info.updated_time = Utc::now(); + + let raw_value = serde_json::to_vec(&new_flow_info).context(crate::error::SerdeJsonSnafu)?; + + self.context + .table_metadata_manager + .kv_backend() + .put( + PutRequest::new() + .with_key(FlowInfoKey::new(flow_id).to_bytes()) + .with_value(raw_value), + ) + .await?; + + info!( + "Updated comment for flow {}.{}", + self.data.catalog_name, self.data.object_name + ); + + Ok(()) + } + + async fn update_table_info( + &self, + current_table_info: &DeserializedValueWithBytes, + new_table_info: RawTableInfo, + ) -> Result<()> { + let table_id = current_table_info.table_info.ident.table_id; + let new_table_info_value = current_table_info.update(new_table_info); + let raw_value = new_table_info_value.try_as_raw_value()?; + + self.context + .table_metadata_manager + .kv_backend() + .put( + PutRequest::new() + .with_key(TableInfoKey::new(table_id).to_bytes()) + .with_value(raw_value), + ) + .await?; + + Ok(()) + } + + pub async fn on_invalidate_cache(&mut self) -> Result { + let cache_invalidator = &self.context.cache_invalidator; + + match self.data.object_type { + CommentObjectType::Table | CommentObjectType::Column => { + let table_id = self.data.table_id.unwrap(); + let table_name = TableName::new( + self.data.catalog_name.clone(), + self.data.schema_name.clone(), + self.data.object_name.clone(), + ); + + let cache_ident = vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(table_name), + ]; + + cache_invalidator + .invalidate(&Context::default(), &cache_ident) + .await?; + } + CommentObjectType::Flow => { + let flow_id = self.data.flow_id.unwrap(); + let cache_ident = vec![CacheIdent::FlowId(flow_id)]; + + cache_invalidator + .invalidate(&Context::default(), &cache_ident) + .await?; + } + } + + Ok(Status::done()) + } +} + +#[async_trait] +impl Procedure for CommentOnProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + match self.data.state { + CommentOnState::Prepare => self.on_prepare().await, + CommentOnState::UpdateMetadata => self.on_update_metadata().await, + CommentOnState::InvalidateCache => self.on_invalidate_cache().await, + } + .map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog = &self.data.catalog_name; + let schema = &self.data.schema_name; + + let lock_key = match self.data.object_type { + CommentObjectType::Table | CommentObjectType::Column => { + vec![ + CatalogLock::Read(catalog).into(), + SchemaLock::read(catalog, schema).into(), + TableNameLock::new(catalog, schema, &self.data.object_name).into(), + ] + } + CommentObjectType::Flow => { + vec![ + CatalogLock::Read(catalog).into(), + FlowNameLock::new(catalog, &self.data.object_name).into(), + ] + } + }; + + LockKey::new(lock_key) + } +} + +#[derive(Debug, Serialize, Deserialize, AsRefStr)] +enum CommentOnState { + Prepare, + UpdateMetadata, + InvalidateCache, +} + +/// The data of comment on procedure. +#[derive(Debug, Serialize, Deserialize)] +pub struct CommentOnData { + state: CommentOnState, + catalog_name: String, + schema_name: String, + object_type: CommentObjectType, + object_name: String, + /// Column name (only for Column comments) + column_name: Option, + comment: Option, + /// Cached table ID (for Table/Column) + #[serde(skip_serializing_if = "Option::is_none")] + table_id: Option, + /// Cached table info (for Table/Column) + #[serde(skip)] + table_info: Option>, + /// Cached flow ID (for Flow) + #[serde(skip_serializing_if = "Option::is_none")] + flow_id: Option, + /// Cached flow info (for Flow) + #[serde(skip)] + flow_info: Option>, + /// Whether the comment is unchanged (optimization for early exit) + #[serde(skip)] + is_unchanged: bool, +} + +impl CommentOnData { + pub fn new(task: CommentOnTask) -> Self { + Self { + state: CommentOnState::Prepare, + catalog_name: task.catalog_name, + schema_name: task.schema_name, + object_type: task.object_type, + object_name: task.object_name, + column_name: task.column_name, + comment: task.comment, + table_id: None, + table_info: None, + flow_id: None, + flow_info: None, + is_unchanged: false, + } + } +} + +fn update_column_comment_metadata( + column_schema: &mut datatypes::schema::ColumnSchema, + comment: Option, +) { + match comment { + Some(value) => { + column_schema + .mut_metadata() + .insert(COLUMN_COMMENT_KEY.to_string(), value); + } + None => { + column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY); + } + } +} + +fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) { + match comment { + Some(value) => { + options + .extra_options + .insert(TABLE_COMMENT_KEY.to_string(), value.to_string()); + } + None => { + options.extra_options.remove(TABLE_COMMENT_KEY); + } + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 9ade13052d64..fd2150d47ef8 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -26,6 +26,7 @@ use store_api::storage::TableId; use crate::ddl::alter_database::AlterDatabaseProcedure; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; +use crate::ddl::comment_on::CommentOnProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; @@ -51,18 +52,18 @@ use crate::rpc::ddl::DdlTask::CreateTrigger; #[cfg(feature = "enterprise")] use crate::rpc::ddl::DdlTask::DropTrigger; use crate::rpc::ddl::DdlTask::{ - AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, - CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, - TruncateTable, + AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow, + CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, + DropTable, DropView, TruncateTable, }; #[cfg(feature = "enterprise")] use crate::rpc::ddl::trigger::CreateTriggerTask; #[cfg(feature = "enterprise")] use crate::rpc::ddl::trigger::DropTriggerTask; use crate::rpc::ddl::{ - AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, - CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext, - SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask, + CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, + QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::router::RegionRoute; @@ -181,7 +182,8 @@ impl DdlManager { TruncateTableProcedure, CreateDatabaseProcedure, DropDatabaseProcedure, - DropViewProcedure + DropViewProcedure, + CommentOnProcedure ); for (type_name, loader_factory) in loaders { @@ -397,6 +399,19 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + /// Submits and executes a comment on task. + #[tracing::instrument(skip_all)] + pub async fn submit_comment_on_task( + &self, + comment_on_task: CommentOnTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = CommentOnProcedure::new(comment_on_task, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + async fn submit_procedure( &self, procedure_with_id: ProcedureWithId, @@ -465,6 +480,7 @@ impl DdlManager { handle_create_view_task(self, create_view_task).await } DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await, + CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await, #[cfg(feature = "enterprise")] CreateTrigger(create_trigger_task) => { handle_create_trigger_task( @@ -896,6 +912,26 @@ async fn handle_create_view_task( }) } +async fn handle_comment_on_task( + ddl_manager: &DdlManager, + comment_on_task: CommentOnTask, +) -> Result { + let (id, _) = ddl_manager + .submit_comment_on_task(comment_on_task.clone()) + .await?; + + let procedure_id = id.to_string(); + info!( + "Comment on {}.{}.{} is updated via procedure_id {id:?}", + comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 1bb099f44c5e..93176a6d912d 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -94,7 +94,7 @@ impl TableInfoValue { } } - pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self { + pub fn update(&self, new_table_info: RawTableInfo) -> Self { Self { table_info: new_table_info, version: self.version + 1, diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 9f0d69442a6e..12f055d083fd 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -23,19 +23,20 @@ use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind; use api::v1::meta::ddl_task_request::Task; use api::v1::meta::{ AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask, - AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask, - CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask, - CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask, - DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, - DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask, - DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, - DropViewTask as PbDropViewTask, Partition, ProcedureId, + AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask, + CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask, + CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, + CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest, + DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask, + DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask, + DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; use api::v1::{ - AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, - CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, - ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr, + AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr, + CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr, + DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption, + QueryContext as PbQueryContext, TruncateTableExpr, }; use base64::Engine as _; use base64::engine::general_purpose; @@ -77,6 +78,7 @@ pub enum DdlTask { DropView(DropViewTask), #[cfg(feature = "enterprise")] CreateTrigger(trigger::CreateTriggerTask), + CommentOn(CommentOnTask), } impl DdlTask { @@ -199,6 +201,11 @@ impl DdlTask { view_info, }) } + + /// Creates a [`DdlTask`] to comment on a table, column, or flow. + pub fn new_comment_on(task: CommentOnTask) -> Self { + DdlTask::CommentOn(task) + } } impl TryFrom for DdlTask { @@ -277,6 +284,7 @@ impl TryFrom for DdlTask { .fail() } } + Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)), } } } @@ -331,6 +339,7 @@ impl TryFrom for PbDdlTaskRequest { DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?), #[cfg(feature = "enterprise")] DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()), + DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()), }; Ok(Self { @@ -1260,6 +1269,119 @@ impl From for PbDropFlowTask { } } +/// Represents the ID of the object being commented on (Table or Flow). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum CommentObjectId { + Table(TableId), + Flow(FlowId), +} + +/// Comment on table, column, or flow +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct CommentOnTask { + pub catalog_name: String, + pub schema_name: String, + pub object_type: CommentObjectType, + pub object_name: String, + /// Column name (only for Column comments) + pub column_name: Option, + /// Object ID (Table or Flow) for validation and cache invalidation + pub object_id: Option, + pub comment: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum CommentObjectType { + Table, + Column, + Flow, +} + +impl CommentOnTask { + pub fn table_ref(&self) -> TableReference<'_> { + TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: &self.object_name, + } + } +} + +// Proto conversions for CommentObjectType +impl From for PbCommentObjectType { + fn from(object_type: CommentObjectType) -> Self { + match object_type { + CommentObjectType::Table => PbCommentObjectType::Table, + CommentObjectType::Column => PbCommentObjectType::Column, + CommentObjectType::Flow => PbCommentObjectType::Flow, + } + } +} + +impl TryFrom for CommentObjectType { + type Error = error::Error; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(CommentObjectType::Table), + 1 => Ok(CommentObjectType::Column), + 2 => Ok(CommentObjectType::Flow), + _ => error::InvalidProtoMsgSnafu { + err_msg: format!( + "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)", + value + ), + } + .fail(), + } + } +} + +// Proto conversions for CommentOnTask +impl TryFrom for CommentOnTask { + type Error = error::Error; + + fn try_from(pb: PbCommentOnTask) -> Result { + let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu { + err_msg: "expected comment_on", + })?; + + Ok(CommentOnTask { + catalog_name: comment_on.catalog_name, + schema_name: comment_on.schema_name, + object_type: comment_on.object_type.try_into()?, + object_name: comment_on.object_name, + column_name: if comment_on.column_name.is_empty() { + None + } else { + Some(comment_on.column_name) + }, + comment: if comment_on.comment.is_empty() { + None + } else { + Some(comment_on.comment) + }, + object_id: None, + }) + } +} + +impl From for PbCommentOnTask { + fn from(task: CommentOnTask) -> Self { + let pb_object_type: PbCommentObjectType = task.object_type.into(); + PbCommentOnTask { + comment_on: Some(CommentOnExpr { + catalog_name: task.catalog_name, + schema_name: task.schema_name, + object_type: pb_object_type as i32, + object_name: task.object_name, + column_name: task.column_name.unwrap_or_default(), + comment: task.comment.unwrap_or_default(), + }), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct QueryContext { pub(crate) current_catalog: String, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 062bc0cf9544..e1da6e2ff9e6 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -83,6 +83,7 @@ use snafu::prelude::*; use sql::ast::ObjectNamePartExt; use sql::dialect::Dialect; use sql::parser::{ParseOptions, ParserContext}; +use sql::statements::comment::CommentObject; use sql::statements::copy::{CopyDatabase, CopyTable}; use sql::statements::statement::Statement; use sql::statements::tql::Tql; @@ -875,7 +876,7 @@ pub fn check_permission( validate_param(&stmt.table_name, query_ctx)?; } Statement::ShowCreateFlow(stmt) => { - validate_param(&stmt.flow_name, query_ctx)?; + validate_flow(&stmt.flow_name, query_ctx)?; } #[cfg(feature = "enterprise")] Statement::ShowCreateTrigger(stmt) => { @@ -908,6 +909,12 @@ pub fn check_permission( // show charset and show collation won't be checked Statement::ShowCharset(_) | Statement::ShowCollation(_) => {} + Statement::Comment(comment) => match &comment.object { + CommentObject::Table(table) => validate_param(table, query_ctx)?, + CommentObject::Column { table, .. } => validate_param(table, query_ctx)?, + CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?, + }, + Statement::Insert(insert) => { let name = insert.table_name().context(ParseSqlSnafu)?; validate_param(name, query_ctx)?; @@ -993,6 +1000,27 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> .context(SqlExecInterceptedSnafu) } +fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> { + let catalog = match &name.0[..] { + [_flow] => query_ctx.current_catalog().to_string(), + [catalog, _flow] => catalog.to_string_unquoted(), + _ => { + return InvalidSqlSnafu { + err_msg: format!( + "expect flow name to be . or , actual: {name}", + ), + } + .fail(); + } + }; + + let schema = query_ctx.current_schema(); + + validate_catalog_and_schema(&catalog, &schema, query_ctx) + .map_err(BoxedError::new) + .context(SqlExecInterceptedSnafu) +} + fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> { let (catalog, schema) = match &name.0[..] { [schema] => ( @@ -1251,6 +1279,28 @@ mod tests { // test describe table let sql = "DESC TABLE {catalog}{schema}demo;"; - replace_test(sql, plugins, &query_ctx); + replace_test(sql, plugins.clone(), &query_ctx); + + let comment_flow_cases = [ + ("COMMENT ON FLOW my_flow IS 'comment';", true), + ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true), + ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false), + ]; + for (sql, is_ok) in comment_flow_cases { + let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0]; + let result = check_permission(plugins.clone(), stmt, &query_ctx); + assert_eq!(result.is_ok(), is_ok); + } + + let show_flow_cases = [ + ("SHOW CREATE FLOW my_flow;", true), + ("SHOW CREATE FLOW greptime.my_flow;", true), + ("SHOW CREATE FLOW wrongcatalog.my_flow;", false), + ]; + for (sql, is_ok) in show_flow_cases { + let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0]; + let result = check_permission(plugins.clone(), stmt, &query_ctx); + assert_eq!(result.is_ok(), is_ok); + } } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 9eeb57ce0131..129767f177d7 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -230,6 +230,11 @@ impl GrpcQueryHandler for Instance { DdlExpr::DropView(_) => { todo!("implemented in the following PR") } + DdlExpr::CommentOn(expr) => { + self.statement_executor + .comment_by_expr(expr, ctx.clone()) + .await? + } } } }; @@ -330,6 +335,9 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte Expr::DropView(expr) => { check_and_fill!(expr); } + Expr::CommentOn(expr) => { + check_and_fill!(expr); + } } } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 47a89949855c..2c5fd1398def 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -13,6 +13,7 @@ // limitations under the License. mod admin; +mod comment; mod copy_database; mod copy_query_to; mod copy_table_from; @@ -420,6 +421,7 @@ impl StatementExecutor { Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await, Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx), Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx), + Statement::Comment(stmt) => self.comment(stmt, query_ctx).await, Statement::ShowColumns(show_columns) => { self.show_columns(show_columns, query_ctx).await } diff --git a/src/operator/src/statement/comment.rs b/src/operator/src/statement/comment.rs new file mode 100644 index 000000000000..d82d059ad9b8 --- /dev/null +++ b/src/operator/src/statement/comment.rs @@ -0,0 +1,176 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::CommentOnExpr; +use common_error::ext::BoxedError; +use common_meta::procedure_executor::ExecutorContext; +use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest}; +use common_query::Output; +use session::context::QueryContextRef; +use session::table_name::table_idents_to_full_name; +use snafu::ResultExt; +use sql::ast::ObjectNamePartExt; +use sql::statements::comment::{Comment, CommentObject}; + +use crate::error::{ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result}; +use crate::statement::StatementExecutor; + +impl StatementExecutor { + /// Adds a comment to a database object (table, column, or flow). + /// + /// # Arguments + /// + /// * `stmt`: A `Comment` struct containing the object to comment on and the comment text. + /// * `query_ctx`: A `QueryContextRef` providing contextual information for the query. + /// + /// # Returns + /// + /// A `Result` containing the `Output` of the operation, or an error if the operation fails. + pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result { + let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?; + + let request = SubmitDdlTaskRequest { + task: DdlTask::new_comment_on(comment_on_task), + query_context: query_ctx, + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(ExecuteDdlSnafu) + .map(|_| Output::new_with_affected_rows(0)) + } + + pub async fn comment_by_expr( + &self, + expr: CommentOnExpr, + query_ctx: QueryContextRef, + ) -> Result { + let comment_on_task = self.create_comment_on_task_from_expr(expr)?; + + let request = SubmitDdlTaskRequest { + task: DdlTask::new_comment_on(comment_on_task), + query_context: query_ctx, + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(ExecuteDdlSnafu) + .map(|_| Output::new_with_affected_rows(0)) + } + + fn create_comment_on_task_from_expr(&self, expr: CommentOnExpr) -> Result { + let object_type = match expr.object_type { + 0 => CommentObjectType::Table, + 1 => CommentObjectType::Column, + 2 => CommentObjectType::Flow, + _ => { + return InvalidSqlSnafu { + err_msg: format!( + "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)", + expr.object_type + ), + } + .fail(); + } + }; + + Ok(CommentOnTask { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + object_type, + object_name: expr.object_name, + column_name: if expr.column_name.is_empty() { + None + } else { + Some(expr.column_name) + }, + object_id: None, + comment: if expr.comment.is_empty() { + None + } else { + Some(expr.comment) + }, + }) + } + + fn create_comment_on_task_from_stmt( + &self, + stmt: Comment, + query_ctx: &QueryContextRef, + ) -> Result { + match stmt.object { + CommentObject::Table(table) => { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&table, query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(CommentOnTask { + catalog_name, + schema_name, + object_type: CommentObjectType::Table, + object_name: table_name, + column_name: None, + object_id: None, + comment: stmt.comment, + }) + } + CommentObject::Column { table, column } => { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&table, query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(CommentOnTask { + catalog_name, + schema_name, + object_type: CommentObjectType::Column, + object_name: table_name, + column_name: Some(column.value), + object_id: None, + comment: stmt.comment, + }) + } + CommentObject::Flow(flow_name) => { + let (catalog_name, flow_name_str) = match &flow_name.0[..] { + [flow] => ( + query_ctx.current_catalog().to_string(), + flow.to_string_unquoted(), + ), + [catalog, flow] => (catalog.to_string_unquoted(), flow.to_string_unquoted()), + _ => { + return InvalidSqlSnafu { + err_msg: format!( + "expect flow name to be . or , actual: {flow_name}" + ), + } + .fail(); + } + }; + + Ok(CommentOnTask { + catalog_name, + schema_name: String::new(), // Flow doesn't use schema + object_type: CommentObjectType::Flow, + object_name: flow_name_str, + column_name: None, + object_id: None, + comment: stmt.comment, + }) + } + } + } +} diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 11c34fb2ffd4..08c0b4661a8b 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_error::ext::BoxedError; use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; @@ -120,7 +122,30 @@ impl StatementExecutor { table: TableRef, query_ctx: QueryContextRef, ) -> Result { - let table_info = table.table_info(); + let mut table_info = table.table_info(); + let partition_column_names: Vec<_> = + table_info.meta.partition_column_names().cloned().collect(); + + if let Some(latest) = self + .table_metadata_manager + .table_info_manager() + .get(table_info.table_id()) + .await + .context(TableMetadataManagerSnafu)? + { + let mut latest_info = TableInfo::try_from(latest.into_inner().table_info) + .context(error::CreateTableInfoSnafu)?; + + if !partition_column_names.is_empty() { + latest_info.meta.partition_key_indices = partition_column_names + .iter() + .filter_map(|name| latest_info.meta.schema.column_index_by_name(name.as_str())) + .collect(); + } + + table_info = Arc::new(latest_info); + } + if table_info.table_type != TableType::Base { return error::ShowCreateTableBaseOnlySnafu { table_name: table_name.to_string(), @@ -150,7 +175,7 @@ impl StatementExecutor { let partitions = create_partitions_stmt(&table_info, partitions)?; - query::sql::show_create_table(table, schema_options, partitions, query_ctx) + query::sql::show_create_table(table_info, schema_options, partitions, query_ctx) .context(ExecuteStatementSnafu) } diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 6b6ee2ed07e5..2caccef8d7b5 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -64,6 +64,7 @@ use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column}; use table::TableRef; +use table::metadata::TableInfoRef; use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY}; use crate::QueryEngineRef; @@ -789,13 +790,12 @@ pub fn show_create_database(database_name: &str, options: OptionMap) -> Result, partitions: Option, query_ctx: QueryContextRef, ) -> Result { - let table_info = table.table_info(); - let table_name = &table_info.name; + let table_name = table_info.name.clone(); let quote_style = query_ctx.quote_style(); @@ -806,7 +806,7 @@ pub fn show_create_table( }); let sql = format!("{}", stmt); let columns = vec![ - Arc::new(StringVector::from(vec![table_name.clone()])) as _, + Arc::new(StringVector::from(vec![table_name])) as _, Arc::new(StringVector::from(vec![sql])) as _, ]; let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns) diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index 5466bb91e648..61dd703aa89e 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -32,7 +32,9 @@ use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstr use sql::statements::{self, OptionMap}; use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column}; use table::metadata::{TableInfoRef, TableMeta}; -use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY}; +use table::requests::{ + COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY, +}; use crate::error::{ ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu, @@ -238,6 +240,13 @@ pub fn create_table_stmt( let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style); + let mut options = create_sql_options(table_meta, schema_options); + if let Some(comment) = &table_info.desc + && options.get(TABLE_COMMENT_KEY).is_none() + { + options.insert(format!("'{TABLE_COMMENT_KEY}'"), comment.clone()); + } + Ok(CreateTable { if_not_exists: true, table_id: table_info.ident.table_id, @@ -245,7 +254,7 @@ pub fn create_table_stmt( columns, engine: table_meta.engine.clone(), constraints, - options: create_sql_options(table_meta, schema_options), + options, partitions: None, }) } diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 6e2a880348cd..310de35adb95 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -163,6 +163,8 @@ impl ParserContext<'_> { Keyword::TRUNCATE => self.parse_truncate(), + Keyword::COMMENT => self.parse_comment(), + Keyword::SET => self.parse_set_variables(), Keyword::ADMIN => self.parse_admin_command(), diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index e3c41c49b289..7d68d5d1ce0d 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -14,6 +14,7 @@ pub(crate) mod admin_parser; mod alter_parser; +pub(crate) mod comment_parser; pub(crate) mod copy_parser; pub(crate) mod create_parser; pub(crate) mod cursor_parser; diff --git a/src/sql/src/parsers/comment_parser.rs b/src/sql/src/parsers/comment_parser.rs new file mode 100644 index 000000000000..489d191d57f9 --- /dev/null +++ b/src/sql/src/parsers/comment_parser.rs @@ -0,0 +1,193 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::{ResultExt, ensure}; +use sqlparser::ast::ObjectName; +use sqlparser::keywords::Keyword; +use sqlparser::tokenizer::Token; + +use crate::ast::{Ident, ObjectNamePart}; +use crate::error::{self, InvalidSqlSnafu, Result}; +use crate::parser::{FLOW, ParserContext}; +use crate::statements::comment::{Comment, CommentObject}; +use crate::statements::statement::Statement; + +impl ParserContext<'_> { + pub(crate) fn parse_comment(&mut self) -> Result { + let _ = self.parser.next_token(); // consume COMMENT + + if !self.parser.parse_keyword(Keyword::ON) { + return self.expected("ON", self.parser.peek_token()); + } + + let target_token = self.parser.next_token(); + let comment = match target_token.token { + Token::Word(word) if word.keyword == Keyword::TABLE => { + let raw_table = + self.parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + expected: "a table name", + actual: self.peek_token_as_string(), + })?; + let table = Self::canonicalize_object_name(raw_table); + CommentObject::Table(table) + } + Token::Word(word) if word.keyword == Keyword::COLUMN => { + self.parse_column_comment_target()? + } + Token::Word(word) + if word.keyword == Keyword::NoKeyword && word.value.eq_ignore_ascii_case(FLOW) => + { + let raw_flow = + self.parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + expected: "a flow name", + actual: self.peek_token_as_string(), + })?; + let flow = Self::canonicalize_object_name(raw_flow); + CommentObject::Flow(flow) + } + _ => return self.expected("TABLE, COLUMN or FLOW", target_token), + }; + + if !self.parser.parse_keyword(Keyword::IS) { + return self.expected("IS", self.parser.peek_token()); + } + + let comment_value = if self.parser.parse_keyword(Keyword::NULL) { + None + } else { + Some( + self.parser + .parse_literal_string() + .context(error::SyntaxSnafu)?, + ) + }; + + Ok(Statement::Comment(Comment { + object: comment, + comment: comment_value, + })) + } + + fn parse_column_comment_target(&mut self) -> Result { + let raw = self + .parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + expected: "a column reference", + actual: self.peek_token_as_string(), + })?; + let canonical = Self::canonicalize_object_name(raw); + + let mut parts = canonical.0; + ensure!( + parts.len() >= 2, + InvalidSqlSnafu { + msg: "COMMENT ON COLUMN expects .".to_string(), + } + ); + + let column_part = parts.pop().unwrap(); + let ObjectNamePart::Identifier(column_ident) = column_part; + + let column = ParserContext::canonicalize_identifier(column_ident); + + let mut table_idents: Vec = Vec::with_capacity(parts.len()); + for part in parts { + match part { + ObjectNamePart::Identifier(ident) => { + table_idents.push(ident); + } + } + } + + ensure!( + !table_idents.is_empty(), + InvalidSqlSnafu { + msg: "Table name is required before column name".to_string(), + } + ); + + let table = ObjectName::from(table_idents); + + Ok(CommentObject::Column { table, column }) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use crate::dialect::GreptimeDbDialect; + use crate::parser::{ParseOptions, ParserContext}; + use crate::statements::comment::CommentObject; + use crate::statements::statement::Statement; + + fn parse(sql: &str) -> Statement { + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(stmts.len(), 1); + stmts.pop().unwrap() + } + + #[test] + fn test_parse_comment_on_table() { + let stmt = parse("COMMENT ON TABLE mytable IS 'test';"); + match stmt { + Statement::Comment(comment) => { + assert_matches!(comment.object, CommentObject::Table(ref name) if name.to_string() == "mytable"); + assert_eq!(comment.comment.as_deref(), Some("test")); + } + _ => panic!("expected comment statement"), + } + + let stmt = parse("COMMENT ON TABLE mytable IS NULL;"); + match stmt { + Statement::Comment(comment) => { + assert_matches!(comment.object, CommentObject::Table(ref name) if name.to_string() == "mytable"); + assert!(comment.comment.is_none()); + } + _ => panic!("expected comment statement"), + } + } + + #[test] + fn test_parse_comment_on_column() { + let stmt = parse("COMMENT ON COLUMN my_schema.my_table.my_col IS 'desc';"); + match stmt { + Statement::Comment(comment) => match comment.object { + CommentObject::Column { table, column } => { + assert_eq!(table.to_string(), "my_schema.my_table"); + assert_eq!(column.value, "my_col"); + assert_eq!(comment.comment.as_deref(), Some("desc")); + } + _ => panic!("expected column comment"), + }, + _ => panic!("expected comment statement"), + } + } + + #[test] + fn test_parse_comment_on_flow() { + let stmt = parse("COMMENT ON FLOW my_flow IS 'desc';"); + match stmt { + Statement::Comment(comment) => { + assert_matches!(comment.object, CommentObject::Flow(ref name) if name.to_string() == "my_flow"); + assert_eq!(comment.comment.as_deref(), Some("desc")); + } + _ => panic!("expected comment statement"), + } + } +} diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 051368f12cb0..eb94604cc574 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -14,6 +14,7 @@ pub mod admin; pub mod alter; +pub mod comment; pub mod copy; pub mod create; pub mod cursor; diff --git a/src/sql/src/statements/comment.rs b/src/sql/src/statements/comment.rs new file mode 100644 index 000000000000..ec0f8d37b679 --- /dev/null +++ b/src/sql/src/statements/comment.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Display, Formatter}; + +use serde::Serialize; +use sqlparser_derive::{Visit, VisitMut}; + +use crate::ast::{Ident, ObjectName}; + +/// Represents a SQL COMMENT statement for adding or removing comments on database objects. +/// +/// # Examples +/// +/// ```sql +/// COMMENT ON TABLE my_table IS 'This is a table comment'; +/// COMMENT ON COLUMN my_table.my_column IS 'This is a column comment'; +/// COMMENT ON FLOW my_flow IS NULL; +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] +pub struct Comment { + pub object: CommentObject, + pub comment: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] +pub enum CommentObject { + Table(ObjectName), + Column { table: ObjectName, column: Ident }, + Flow(ObjectName), +} + +impl Display for Comment { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "COMMENT ON {} IS ", self.object)?; + match &self.comment { + Some(comment) => { + let escaped = comment.replace('\'', "''"); + write!(f, "'{}'", escaped) + } + None => f.write_str("NULL"), + } + } +} + +impl Display for CommentObject { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + CommentObject::Table(name) => write!(f, "TABLE {}", name), + CommentObject::Column { table, column } => { + write!(f, "COLUMN {}.{}", table, column) + } + CommentObject::Flow(name) => write!(f, "FLOW {}", name), + } + } +} diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index d0096baa7152..83cdaff7eea6 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -22,6 +22,7 @@ use sqlparser_derive::{Visit, VisitMut}; use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::statements::admin::Admin; use crate::statements::alter::{AlterDatabase, AlterTable}; +use crate::statements::comment::Comment; use crate::statements::copy::Copy; use crate::statements::create::{ CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, @@ -137,6 +138,8 @@ pub enum Statement { SetVariables(SetVariables), // SHOW VARIABLES ShowVariables(ShowVariables), + // COMMENT ON + Comment(Comment), // USE Use(String), // Admin statement(extension) @@ -204,6 +207,7 @@ impl Statement { | Statement::Copy(_) | Statement::TruncateTable(_) | Statement::SetVariables(_) + | Statement::Comment(_) | Statement::Use(_) | Statement::DeclareCursor(_) | Statement::CloseCursor(_) @@ -267,6 +271,7 @@ impl Display for Statement { Statement::TruncateTable(s) => s.fmt(f), Statement::SetVariables(s) => s.fmt(f), Statement::ShowVariables(s) => s.fmt(f), + Statement::Comment(s) => s.fmt(f), Statement::ShowCharset(kind) => { write!(f, "SHOW CHARSET {kind}") } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index b5a130137cbc..8d4dc9c3ceb5 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -1256,6 +1256,7 @@ CREATE TABLE {table_name} ( | | | | | ENGINE=mito | | | WITH( | +| | 'comment' = 'Created on insertion', | | | 'compaction.twcs.time_window' = '1d', | | | 'compaction.type' = 'twcs' | | | ) | diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index b9e56564a58b..6caf70717310 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -511,23 +511,24 @@ async fn insert_with_hints_and_assert(db: &Database) { let pretty = record_batches.pretty_print().unwrap(); let expected = "\ -+-------+-------------------------------------+ -| Table | Create Table | -+-------+-------------------------------------+ -| demo | CREATE TABLE IF NOT EXISTS \"demo\" ( | -| | \"host\" STRING NULL, | -| | \"cpu\" DOUBLE NULL, | -| | \"memory\" DOUBLE NULL, | -| | \"ts\" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX (\"ts\"), | -| | PRIMARY KEY (\"host\") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | append_mode = 'true' | -| | ) | -+-------+-------------------------------------+\ ++-------+---------------------------------------+ +| Table | Create Table | ++-------+---------------------------------------+ +| demo | CREATE TABLE IF NOT EXISTS \"demo\" ( | +| | \"host\" STRING NULL, | +| | \"cpu\" DOUBLE NULL, | +| | \"memory\" DOUBLE NULL, | +| | \"ts\" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX (\"ts\"), | +| | PRIMARY KEY (\"host\") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Created on insertion', | +| | append_mode = 'true' | +| | ) | ++-------+---------------------------------------+\ "; assert_eq!(pretty, expected); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 347a8d12c9b0..2f4e25a3e725 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1743,7 +1743,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { expected, ) .await; - let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f1'\\n)\"]]"; + let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f1'\\n)\"]]"; validate_data( "test_prometheus_remote_special_labels_idc3_show_create_table", &client, @@ -1769,7 +1769,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { expected, ) .await; - let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f2'\\n)\"]]"; + let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f2'\\n)\"]]"; validate_data( "test_prometheus_remote_special_labels_idc4_show_create_table", &client, @@ -2231,7 +2231,7 @@ transform: assert_eq!(res.status(), StatusCode::OK); // 3. check schema - let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"id2\\\" INT NULL INVERTED INDEX,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"id2\\\" INT NULL INVERTED INDEX,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; validate_data( "pipeline_schema", &client, @@ -3072,9 +3072,10 @@ table_suffix: _${type} // ) // ENGINE=mito // WITH( + // 'comment' = 'Created on insertion', // append_mode = 'true' // ) - let expected = "[[\"d_table_db\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_db\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"d_table_db\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_db\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; validate_data( "test_pipeline_context_db", @@ -3089,11 +3090,12 @@ table_suffix: _${type} // ) // ENGINE=mito // WITH( + // 'comment' = 'Created on insertion', // append_mode = 'true', // skip_wal = 'true', // ttl = '1day' // ) - let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]"; + let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]"; validate_data( "test_pipeline_context_http", &client, @@ -3306,13 +3308,14 @@ transform: // ) // ENGINE=mito // WITH( + // 'comment' = 'Created on insertion', // append_mode = 'true' // ) validate_data( "test_pipeline_2_schema", &client, "show create table d_table", - "[[\"d_table\",\"CREATE TABLE IF NOT EXISTS \\\"d_table\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n \\\"id2\\\" STRING NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]", + "[[\"d_table\",\"CREATE TABLE IF NOT EXISTS \\\"d_table\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n \\\"id2\\\" STRING NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]", ) .await; @@ -4275,10 +4278,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // ) // ENGINE=metric // WITH( + // 'comment' = 'Created on insertion', // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_all_show_create_table", &client, @@ -4347,10 +4351,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // ) // ENGINE=metric // WITH( + // 'comment' = 'Created on insertion', // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table", &client, @@ -4410,10 +4415,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // ) // ENGINE=metric // WITH( + // 'comment' = 'Created on insertion', // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table_none", &client, @@ -4613,7 +4619,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#; validate_data("otlp_traces", &client, "select * from mytable;", expected).await; - let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -4622,7 +4628,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; - let expected_ddl = r#"[["mytable_services","CREATE TABLE IF NOT EXISTS \"mytable_services\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"service_name\" STRING NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n append_mode = 'false'\n)"]]"#; + let expected_ddl = r#"[["mytable_services","CREATE TABLE IF NOT EXISTS \"mytable_services\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"service_name\" STRING NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'false'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -4865,7 +4871,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_pb_schema", &client, @@ -4997,9 +5003,10 @@ processors: // ) // ENGINE=mito // WITH( + // 'comment' = 'Created on insertion', // append_mode = 'true' // ) - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_pb_schema", &client, @@ -5069,7 +5076,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_json_schema", &client, @@ -5170,9 +5177,10 @@ processors: // ) // ENGINE=mito // WITH( + // 'comment' = 'Created on insertion', // append_mode = 'true' // ) - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_json_schema", &client, diff --git a/tests/cases/distributed/flow-tql/flow_tql.result b/tests/cases/distributed/flow-tql/flow_tql.result index 6afffc6edb7e..b1e62b3317d2 100644 --- a/tests/cases/distributed/flow-tql/flow_tql.result +++ b/tests/cases/distributed/flow-tql/flow_tql.result @@ -15,20 +15,22 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+---------------------------------------------------+ +| Table | Create Table | ++----------+---------------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------+---------------------------------------------------+ -- test if sink table is tql queryable TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); @@ -157,20 +159,22 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+---------------------------------------------------+ +| Table | Create Table | ++----------+---------------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------+---------------------------------------------------+ -- test if sink table is tql queryable TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); @@ -258,7 +262,9 @@ SHOW CREATE TABLE rate_reqs; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +-----------+-----------------------------------------------------------+ -- test if sink table is tql queryable @@ -337,7 +343,9 @@ SHOW CREATE TABLE rate_reqs; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +-----------+-----------------------------------------------------------+ -- test if sink table is tql queryable diff --git a/tests/cases/standalone/common/comment.result b/tests/cases/standalone/common/comment.result new file mode 100644 index 000000000000..19f9b8776b9b --- /dev/null +++ b/tests/cases/standalone/common/comment.result @@ -0,0 +1,184 @@ +-- Test: COMMENT ON TABLE add & remove +CREATE TABLE comment_table_test ( + pk INT, + val DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(pk) +); + +Affected Rows: 0 + +-- Add table comment +COMMENT ON TABLE comment_table_test IS 'table level description'; + +Affected Rows: 0 + +SHOW CREATE TABLE comment_table_test; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| comment_table_test | CREATE TABLE IF NOT EXISTS "comment_table_test" ( | +| | "pk" INT NULL, | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("pk") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | comment = 'table level description' | +| | ) | ++--------------------+---------------------------------------------------+ + +-- Remove table comment +COMMENT ON TABLE comment_table_test IS NULL; + +Affected Rows: 0 + +SHOW CREATE TABLE comment_table_test; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| comment_table_test | CREATE TABLE IF NOT EXISTS "comment_table_test" ( | +| | "pk" INT NULL, | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("pk") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + +DROP TABLE comment_table_test; + +Affected Rows: 0 + +-- Test: COMMENT ON COLUMN add & remove +CREATE TABLE comment_column_test ( + pk INT, + val DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(pk) +); + +Affected Rows: 0 + +-- Add column comment +COMMENT ON COLUMN comment_column_test.val IS 'value column description'; + +Affected Rows: 0 + +SHOW CREATE TABLE comment_column_test; + ++---------------------+---------------------------------------------------------+ +| Table | Create Table | ++---------------------+---------------------------------------------------------+ +| comment_column_test | CREATE TABLE IF NOT EXISTS "comment_column_test" ( | +| | "pk" INT NULL, | +| | "val" DOUBLE NULL COMMENT 'value column description', | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("pk") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++---------------------+---------------------------------------------------------+ + +-- Remove column comment +COMMENT ON COLUMN comment_column_test.val IS NULL; + +Affected Rows: 0 + +SHOW CREATE TABLE comment_column_test; + ++---------------------+----------------------------------------------------+ +| Table | Create Table | ++---------------------+----------------------------------------------------+ +| comment_column_test | CREATE TABLE IF NOT EXISTS "comment_column_test" ( | +| | "pk" INT NULL, | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("pk") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++---------------------+----------------------------------------------------+ + +DROP TABLE comment_column_test; + +Affected Rows: 0 + +-- Test: COMMENT ON FLOW add & remove +-- Prepare source & sink tables +CREATE TABLE flow_source_comment_test ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE flow_sink_comment_test ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW flow_comment_test +SINK TO flow_sink_comment_test +AS +SELECT desc_str, ts FROM flow_source_comment_test; + +Affected Rows: 0 + +-- Add flow comment +COMMENT ON FLOW flow_comment_test IS 'flow level description'; + +Affected Rows: 0 + +SHOW CREATE FLOW flow_comment_test; + ++-------------------+------------------------------------------------------+ +| Flow | Create Flow | ++-------------------+------------------------------------------------------+ +| flow_comment_test | CREATE FLOW IF NOT EXISTS flow_comment_test | +| | SINK TO flow_sink_comment_test | +| | COMMENT 'flow level description' | +| | AS SELECT desc_str, ts FROM flow_source_comment_test | ++-------------------+------------------------------------------------------+ + +-- Remove flow comment +COMMENT ON FLOW flow_comment_test IS NULL; + +Affected Rows: 0 + +SHOW CREATE FLOW flow_comment_test; + ++-------------------+------------------------------------------------------+ +| Flow | Create Flow | ++-------------------+------------------------------------------------------+ +| flow_comment_test | CREATE FLOW IF NOT EXISTS flow_comment_test | +| | SINK TO flow_sink_comment_test | +| | AS SELECT desc_str, ts FROM flow_source_comment_test | ++-------------------+------------------------------------------------------+ + +DROP FLOW flow_comment_test; + +Affected Rows: 0 + +DROP TABLE flow_source_comment_test; + +Affected Rows: 0 + +DROP TABLE flow_sink_comment_test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/comment.sql b/tests/cases/standalone/common/comment.sql new file mode 100644 index 000000000000..564480563bde --- /dev/null +++ b/tests/cases/standalone/common/comment.sql @@ -0,0 +1,65 @@ +-- Test: COMMENT ON TABLE add & remove +CREATE TABLE comment_table_test ( + pk INT, + val DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(pk) +); + +-- Add table comment +COMMENT ON TABLE comment_table_test IS 'table level description'; +SHOW CREATE TABLE comment_table_test; + +-- Remove table comment +COMMENT ON TABLE comment_table_test IS NULL; +SHOW CREATE TABLE comment_table_test; + +DROP TABLE comment_table_test; + +-- Test: COMMENT ON COLUMN add & remove +CREATE TABLE comment_column_test ( + pk INT, + val DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(pk) +); + +-- Add column comment +COMMENT ON COLUMN comment_column_test.val IS 'value column description'; +SHOW CREATE TABLE comment_column_test; + +-- Remove column comment +COMMENT ON COLUMN comment_column_test.val IS NULL; +SHOW CREATE TABLE comment_column_test; + +DROP TABLE comment_column_test; + +-- Test: COMMENT ON FLOW add & remove +-- Prepare source & sink tables +CREATE TABLE flow_source_comment_test ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE flow_sink_comment_test ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW flow_comment_test +SINK TO flow_sink_comment_test +AS +SELECT desc_str, ts FROM flow_source_comment_test; + +-- Add flow comment +COMMENT ON FLOW flow_comment_test IS 'flow level description'; +SHOW CREATE FLOW flow_comment_test; + +-- Remove flow comment +COMMENT ON FLOW flow_comment_test IS NULL; +SHOW CREATE FLOW flow_comment_test; + +DROP FLOW flow_comment_test; +DROP TABLE flow_source_comment_test; +DROP TABLE flow_sink_comment_test; + diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result index 05ae665be86c..12b27ace13d8 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.result +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -46,6 +46,7 @@ SHOW CREATE TABLE distinct_basic; | | ) | +----------------+-----------------------------------------------------------+ +-- SQLNESS REPLACE \d{4} REDACTED SHOW CREATE TABLE out_distinct_basic; +--------------------+---------------------------------------------------+ @@ -60,7 +61,9 @@ SHOW CREATE TABLE out_distinct_basic; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Sink table for flow flow-id=REDACTED' | +| | ) | +--------------------+---------------------------------------------------+ -- SQLNESS SLEEP 3s @@ -242,7 +245,9 @@ SHOW CREATE TABLE out_distinct_basic; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +--------------------+---------------------------------------------------+ SELECT diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.sql b/tests/cases/standalone/common/flow/flow_advance_ttl.sql index 141c595e8958..9574eabd918c 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.sql +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.sql @@ -20,6 +20,7 @@ SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; SHOW CREATE TABLE distinct_basic; +-- SQLNESS REPLACE \d{4} REDACTED SHOW CREATE TABLE out_distinct_basic; -- SQLNESS SLEEP 3s diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result index f1d229e6e8c7..90d53b9598ac 100644 --- a/tests/cases/standalone/common/flow/flow_auto_sink_table.result +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -20,19 +20,21 @@ Affected Rows: 0 SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "sum(numbers_input_basic.number)" BIGINT NULL, | -| | "time_window" TIMESTAMP(9) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sum(numbers_input_basic.number)" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); @@ -55,19 +57,21 @@ SELECT 1; -- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "sum(numbers_input_basic.number)" BIGINT NULL, | -| | "time_window" TIMESTAMP(9) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sum(numbers_input_basic.number)" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ SHOW CREATE FLOW test_numbers_basic; @@ -122,19 +126,21 @@ SELECT 1; -- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "sumup" BIGINT NULL, | -| | "event_time" TIMESTAMP(3) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("event_time") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; @@ -158,19 +164,21 @@ SHOW CREATE FLOW test_numbers_basic; SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "sumup" BIGINT NULL, | -| | "event_time" TIMESTAMP(3) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("event_time") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ DROP FLOW test_numbers_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index e089af178127..ef03d16b99c5 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -20,19 +20,21 @@ Affected Rows: 0 SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "sum(numbers_input_basic.number)" BIGINT NULL, | -| | "time_window" TIMESTAMP(9) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sum(numbers_input_basic.number)" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 @@ -47,19 +49,21 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "sum(numbers_input_basic.number)" BIGINT NULL, | -| | "time_window" TIMESTAMP(9) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sum(numbers_input_basic.number)" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; @@ -172,19 +176,21 @@ Affected Rows: 0 SHOW CREATE TABLE out_basic; -+-----------+---------------------------------------------+ -| Table | Create Table | -+-----------+---------------------------------------------+ -| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | -| | "wildcard" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-----------+---------------------------------------------+ ++-----------+---------------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-----------+---------------------------------------------------+ DROP FLOW test_wildcard_basic; @@ -200,19 +206,21 @@ Affected Rows: 0 SHOW CREATE TABLE out_basic; -+-----------+---------------------------------------------+ -| Table | Create Table | -+-----------+---------------------------------------------+ -| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | -| | "wildcard" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-----------+---------------------------------------------+ ++-----------+---------------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-----------+---------------------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; @@ -243,19 +251,21 @@ ADMIN FLUSH_FLOW('test_wildcard_basic'); SHOW CREATE TABLE out_basic; -+-----------+---------------------------------------------+ -| Table | Create Table | -+-----------+---------------------------------------------+ -| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | -| | "wildcard" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-----------+---------------------------------------------+ ++-----------+---------------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-----------+---------------------------------------------------+ SELECT wildcard FROM out_basic; @@ -309,7 +319,9 @@ SHOW CREATE TABLE out_distinct_basic; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +--------------------+---------------------------------------------------+ -- TODO(discord9): confirm if it's necessary to flush flow here? @@ -365,7 +377,9 @@ SHOW CREATE TABLE out_distinct_basic; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +--------------------+---------------------------------------------------+ SELECT @@ -637,20 +651,22 @@ Affected Rows: 0 SHOW CREATE TABLE ngx_country; -+-------------+---------------------------------------------+ -| Table | Create Table | -+-------------+---------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "country" STRING NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("country") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+---------------------------------------------+ ++-------------+---------------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("country") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------+---------------------------------------------------+ INSERT INTO ngx_access_log @@ -670,20 +686,22 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); SHOW CREATE TABLE ngx_country; -+-------------+---------------------------------------------+ -| Table | Create Table | -+-------------+---------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "country" STRING NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("country") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+---------------------------------------------+ ++-------------+---------------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("country") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------+---------------------------------------------------+ SELECT country @@ -787,20 +805,22 @@ Affected Rows: 0 SHOW CREATE TABLE ngx_country; -+-------------+--------------------------------------------+ -| Table | Create Table | -+-------------+--------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "country" STRING NULL, | -| | "time_window" TIMESTAMP(3) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("time_window"), | -| | PRIMARY KEY ("country") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+--------------------------------------------+ ++-------------+---------------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("country") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------+---------------------------------------------------+ INSERT INTO ngx_access_log @@ -820,20 +840,22 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); SHOW CREATE TABLE ngx_country; -+-------------+--------------------------------------------+ -| Table | Create Table | -+-------------+--------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "country" STRING NULL, | -| | "time_window" TIMESTAMP(3) NOT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("time_window"), | -| | PRIMARY KEY ("country") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+--------------------------------------------+ ++-------------+---------------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("country") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------+---------------------------------------------------+ SELECT country, @@ -1673,19 +1695,21 @@ Affected Rows: 0 SHOW CREATE TABLE out_num_cnt_basic; -+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | -| | "avg_after_filter_num" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------------+--------------------------------------------------+ ++-------------------+---------------------------------------------------+ +| Table | Create Table | ++-------------------+---------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "avg_after_filter_num" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-------------------+---------------------------------------------------+ -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 diff --git a/tests/cases/standalone/flow-tql/flow_tql.result b/tests/cases/standalone/flow-tql/flow_tql.result index 6fb9386e83b5..1661eacf2484 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.result +++ b/tests/cases/standalone/flow-tql/flow_tql.result @@ -15,20 +15,22 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+---------------------------------------------------+ +| Table | Create Table | ++----------+---------------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------+---------------------------------------------------+ -- test if sink table is tql queryable TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); @@ -157,20 +159,22 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+---------------------------------------------------+ +| Table | Create Table | ++----------+---------------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------+---------------------------------------------------+ -- test if sink table is tql queryable TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); @@ -258,7 +262,9 @@ SHOW CREATE TABLE rate_reqs; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +-----------+-----------------------------------------------------------+ -- test if sink table is tql queryable @@ -337,7 +343,9 @@ SHOW CREATE TABLE rate_reqs; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | +-----------+-----------------------------------------------------------+ -- test if sink table is tql queryable