Skip to content

Commit

Permalink
feat: primary key definition support specify tsid column (apache#254)
Browse files Browse the repository at this point in the history
* add more metrics

* support custom primary key
  • Loading branch information
jiacai2050 authored Sep 15, 2022
1 parent 5c14f00 commit 25557c7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
11 changes: 7 additions & 4 deletions query_engine/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

//! Query executor
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use async_trait::async_trait;
use common_types::record_batch::RecordBatch;
use common_util::time::InstantExt;
use futures::TryStreamExt;
use log::debug;
use log::{debug, info};
use snafu::{ResultExt, Snafu};
use sql::{plan::QueryPlan, provider::CatalogProviderAdapter};
use table_engine::stream::SendableRecordBatchStream;
Expand Down Expand Up @@ -92,6 +93,7 @@ impl Executor for ExecutorImpl {
df_ctx.register_catalog(&name, Arc::new(catalog));
}
let request_id = ctx.request_id();
let begin_instant = Instant::now();

let physical_plan = optimize_plan(ctx, plan).await?;

Expand All @@ -106,9 +108,10 @@ impl Executor for ExecutorImpl {
// calculation
let record_batches = collect(stream).await?;

debug!(
"Executor executed plan, request_id:{}, plan_and_metrics: {}",
info!(
"Executor executed plan, request_id:{}, cost:{}ms, plan_and_metrics: {}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
physical_plan.metrics_to_string()
);

Expand Down
2 changes: 1 addition & 1 deletion server/src/grpc/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn fetch_query_output<Q: QueryExecutor + 'static>(
})?;

info!(
"Grpc handle query success, catalog:{}, tenant:{}, request_id:{}, cost:{}, request:{:?}",
"Grpc handle query success, catalog:{}, tenant:{}, request_id:{}, cost:{}ms, request:{:?}",
ctx.catalog(),
ctx.tenant(),
request_id,
Expand Down
8 changes: 7 additions & 1 deletion server/src/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

//! SQL request handler
use std::time::Instant;

use arrow_deps::arrow::error::Result as ArrowResult;
use common_types::{
datum::{Datum, DatumKind},
request_id::RequestId,
};
use common_util::time::InstantExt;
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
use log::info;
use query_engine::executor::RecordBatchVec;
Expand Down Expand Up @@ -101,6 +104,7 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
request: Request,
) -> Result<Response> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
info!(
"sql handler try to process request, request_id:{}, request:{:?}",
request_id, request
Expand Down Expand Up @@ -168,7 +172,9 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
})?;

info!(
"sql handler finished processing request, request:{:?}",
"sql handler finished, request_id:{}, cost:{}ms, request:{:?}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
request
);

Expand Down
41 changes: 25 additions & 16 deletions sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
}))
}

fn tsid_column_schema() -> Result<ColumnSchema> {
column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64)
.is_nullable(false)
.build()
.context(InvalidColumnSchema {
column_name: TSID_COLUMN,
})
}

fn create_table_to_plan(&self, stmt: CreateTable) -> Result<Plan> {
ensure!(!stmt.table_name.is_empty(), CreateTableNameEmpty);

Expand All @@ -279,12 +288,13 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {

let mut schema_builder =
schema::Builder::with_capacity(stmt.columns.len()).auto_increment_column_id(true);
let mut name_column_map = BTreeMap::new();

// Build all column schemas.
for col in &stmt.columns {
name_column_map.insert(col.name.value.as_str(), parse_column(col)?);
}
let mut name_column_map = stmt
.columns
.iter()
.map(|col| Ok((col.name.value.as_str(), parse_column(col)?)))
.collect::<Result<BTreeMap<_, _>>>()?;

// Tsid column is a reserved column.
ensure!(
Expand Down Expand Up @@ -335,11 +345,16 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
// If primary key is already provided, use that primary key.
if let TableConstraint::Unique { columns, .. } = &stmt.constraints[idx] {
for col in columns {
let key_column = name_column_map.remove(&*col.value).with_context(|| {
PrimaryKeyNotFound {
name: col.value.clone(),
}
})?;
let key_column = if TSID_COLUMN == col.value {
schema_builder = schema_builder.enable_tsid_primary_key(true);
Self::tsid_column_schema()?
} else {
name_column_map
.remove(&*col.value)
.with_context(|| PrimaryKeyNotFound {
name: col.value.clone(),
})?
};
// The schema builder will checks there is only one timestamp column in primary
// key.
schema_builder = schema_builder
Expand All @@ -354,13 +369,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
name: &timestamp_name,
},
)?;
let column_schema =
column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64)
.is_nullable(false)
.build()
.context(InvalidColumnSchema {
column_name: TSID_COLUMN,
})?;
let column_schema = Self::tsid_column_schema()?;
schema_builder = schema_builder
.enable_tsid_primary_key(true)
.add_key_column(timestamp_column)
Expand Down

0 comments on commit 25557c7

Please sign in to comment.