diff --git a/query_engine/src/executor.rs b/query_engine/src/executor.rs index 1c4b245b23..2cf418f35b 100644 --- a/query_engine/src/executor.rs +++ b/query_engine/src/executor.rs @@ -2,12 +2,13 @@ //! Query executor -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use async_trait::async_trait; use common_types::record_batch::RecordBatch; +use common_util::time::InstantExt; use futures::TryStreamExt; -use log::debug; +use log::{debug, info}; use snafu::{ResultExt, Snafu}; use sql::{plan::QueryPlan, provider::CatalogProviderAdapter}; use table_engine::stream::SendableRecordBatchStream; @@ -92,6 +93,7 @@ impl Executor for ExecutorImpl { df_ctx.register_catalog(&name, Arc::new(catalog)); } let request_id = ctx.request_id(); + let begin_instant = Instant::now(); let physical_plan = optimize_plan(ctx, plan).await?; @@ -106,9 +108,10 @@ impl Executor for ExecutorImpl { // calculation let record_batches = collect(stream).await?; - debug!( - "Executor executed plan, request_id:{}, plan_and_metrics: {}", + info!( + "Executor executed plan, request_id:{}, cost:{}ms, plan_and_metrics: {}", request_id, + begin_instant.saturating_elapsed().as_millis(), physical_plan.metrics_to_string() ); diff --git a/server/src/grpc/query.rs b/server/src/grpc/query.rs index 94a16a31a8..bc06847d7a 100644 --- a/server/src/grpc/query.rs +++ b/server/src/grpc/query.rs @@ -153,7 +153,7 @@ pub async fn fetch_query_output( })?; info!( - "Grpc handle query success, catalog:{}, tenant:{}, request_id:{}, cost:{}, request:{:?}", + "Grpc handle query success, catalog:{}, tenant:{}, request_id:{}, cost:{}ms, request:{:?}", ctx.catalog(), ctx.tenant(), request_id, diff --git a/server/src/handlers/sql.rs b/server/src/handlers/sql.rs index a78b41f7d0..a6ac05411a 100644 --- a/server/src/handlers/sql.rs +++ b/server/src/handlers/sql.rs @@ -2,11 +2,14 @@ //! SQL request handler +use std::time::Instant; + use arrow_deps::arrow::error::Result as ArrowResult; use common_types::{ datum::{Datum, DatumKind}, request_id::RequestId, }; +use common_util::time::InstantExt; use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; use log::info; use query_engine::executor::RecordBatchVec; @@ -101,6 +104,7 @@ pub async fn handle_sql( request: Request, ) -> Result { let request_id = RequestId::next_id(); + let begin_instant = Instant::now(); info!( "sql handler try to process request, request_id:{}, request:{:?}", request_id, request @@ -168,7 +172,9 @@ pub async fn handle_sql( })?; info!( - "sql handler finished processing request, request:{:?}", + "sql handler finished, request_id:{}, cost:{}ms, request:{:?}", + request_id, + begin_instant.saturating_elapsed().as_millis(), request ); diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 93a4d523b3..187ab87bea 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -265,6 +265,15 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { })) } + fn tsid_column_schema() -> Result { + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .is_nullable(false) + .build() + .context(InvalidColumnSchema { + column_name: TSID_COLUMN, + }) + } + fn create_table_to_plan(&self, stmt: CreateTable) -> Result { ensure!(!stmt.table_name.is_empty(), CreateTableNameEmpty); @@ -279,12 +288,13 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { let mut schema_builder = schema::Builder::with_capacity(stmt.columns.len()).auto_increment_column_id(true); - let mut name_column_map = BTreeMap::new(); // Build all column schemas. - for col in &stmt.columns { - name_column_map.insert(col.name.value.as_str(), parse_column(col)?); - } + let mut name_column_map = stmt + .columns + .iter() + .map(|col| Ok((col.name.value.as_str(), parse_column(col)?))) + .collect::>>()?; // Tsid column is a reserved column. ensure!( @@ -335,11 +345,16 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { // If primary key is already provided, use that primary key. if let TableConstraint::Unique { columns, .. } = &stmt.constraints[idx] { for col in columns { - let key_column = name_column_map.remove(&*col.value).with_context(|| { - PrimaryKeyNotFound { - name: col.value.clone(), - } - })?; + let key_column = if TSID_COLUMN == col.value { + schema_builder = schema_builder.enable_tsid_primary_key(true); + Self::tsid_column_schema()? + } else { + name_column_map + .remove(&*col.value) + .with_context(|| PrimaryKeyNotFound { + name: col.value.clone(), + })? + }; // The schema builder will checks there is only one timestamp column in primary // key. schema_builder = schema_builder @@ -354,13 +369,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { name: ×tamp_name, }, )?; - let column_schema = - column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) - .is_nullable(false) - .build() - .context(InvalidColumnSchema { - column_name: TSID_COLUMN, - })?; + let column_schema = Self::tsid_column_schema()?; schema_builder = schema_builder .enable_tsid_primary_key(true) .add_key_column(timestamp_column)