From 1448bce55e93252d160e39da500c9409671d14f9 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 8 Mar 2022 13:04:21 -0500 Subject: [PATCH 1/7] Start adding Create Schema --- datafusion/src/logical_plan/mod.rs | 6 +++--- datafusion/src/logical_plan/plan.rs | 32 +++++++++++++++++++++++++---- datafusion/src/sql/planner.rs | 15 ++++++++++---- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index 24d6723210c7..91db89089345 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -62,9 +62,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; pub use extension::UserDefinedLogicalNode; pub use operators::Operator; pub use plan::{ - CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, - JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, - Repartition, TableScan, Union, Values, + CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, + EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, + PlanVisitor, Repartition, TableScan, Union, Values, }; pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan}; pub use registry::FunctionRegistry; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 60d4845dbb39..3f6b2440c8aa 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -185,6 +185,17 @@ pub struct CreateExternalTable { pub has_header: bool, } +/// Creates a schema. +#[derive(Clone)] +pub struct CreateCatalogSchema { + /// The table schema + pub schema_name: String, + /// The table name + pub if_not_exists: bool, + /// Empty schema + pub schema: DFSchemaRef, +} + /// Drops a table. #[derive(Clone)] pub struct DropTable { @@ -346,6 +357,8 @@ pub enum LogicalPlan { CreateExternalTable(CreateExternalTable), /// Creates an in memory table. CreateMemoryTable(CreateMemoryTable), + /// Creates an in memory table. + CreateCatalogSchema(CreateCatalogSchema), /// Drops a table. DropTable(DropTable), /// Values expression. See @@ -390,6 +403,9 @@ impl LogicalPlan { LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => { input.schema() } + LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => { + schema + } LogicalPlan::DropTable(DropTable { schema, .. }) => schema, } } @@ -431,7 +447,8 @@ impl LogicalPlan { LogicalPlan::Explain(Explain { schema, .. }) | LogicalPlan::Analyze(Analyze { schema, .. }) | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) - | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { + | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) + | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => { vec![schema] } LogicalPlan::Limit(Limit { input, .. }) @@ -486,6 +503,7 @@ impl LogicalPlan { | LogicalPlan::Limit(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) + | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::DropTable(_) | LogicalPlan::CrossJoin(_) | LogicalPlan::Analyze { .. } @@ -521,6 +539,7 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable(_) + | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::DropTable(_) => vec![], } } @@ -673,6 +692,7 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation(_) | LogicalPlan::Values(_) | LogicalPlan::CreateExternalTable(_) + | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::DropTable(_) => true, }; if !recurse { @@ -1002,10 +1022,14 @@ impl LogicalPlan { }) => { write!(f, "CreateMemoryTable: {:?}", name) } - LogicalPlan::DropTable(DropTable { - name, if_exists, .. + LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { + schema_name, + .. }) => { - write!(f, "DropTable: {:?} if not exist:={}", name, if_exists) + write!(f, "CreateCatalogSchema: {:?}", schema_name) + } + LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => { + write!(f, "DropTable: {:?} if not exist:={}", name, if_exist) } LogicalPlan::Explain { .. } => write!(f, "Explain"), LogicalPlan::Analyze { .. } => write!(f, "Analyze"), diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index a5a4246284f6..2b21ee3cdb92 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -29,9 +29,9 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, - CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema, - DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, - ToDFSchema, ToStringifiedPlan, + CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, + CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, LogicalPlan, + LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, }; use crate::optimizer::utils::exprlist_to_columns; use crate::prelude::JoinType; @@ -172,7 +172,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "Only `CREATE TABLE table_name AS SELECT ...` statement is supported" .to_string(), )), - + Statement::CreateSchema { + schema_name, + if_not_exists, + } => Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { + schema_name: schema_name.to_string(), + if_not_exists: *if_not_exists, + schema: Arc::new(DFSchema::empty()), + })), Statement::Drop { object_type: ObjectType::Table, if_exists, From bec0c7abf443dee2f588c3795613c86af8118434 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 9 Mar 2022 12:04:45 -0500 Subject: [PATCH 2/7] Refactor catalog and schema providers --- datafusion/src/catalog/catalog.rs | 28 +++++++++------ datafusion/src/catalog/information_schema.rs | 18 ++++++++++ datafusion/src/catalog/schema.rs | 1 + datafusion/src/execution/context.rs | 34 +++++++++++++++++-- .../src/optimizer/common_subexpr_eliminate.rs | 1 + .../src/optimizer/projection_push_down.rs | 1 + datafusion/src/optimizer/utils.rs | 3 +- datafusion/src/physical_plan/planner.rs | 9 +++++ 8 files changed, 80 insertions(+), 15 deletions(-) diff --git a/datafusion/src/catalog/catalog.rs b/datafusion/src/catalog/catalog.rs index d5f509f62bcc..35054dcb4292 100644 --- a/datafusion/src/catalog/catalog.rs +++ b/datafusion/src/catalog/catalog.rs @@ -108,6 +108,14 @@ pub trait CatalogProvider: Sync + Send { /// Retrieves a specific schema from the catalog by name, provided it exists. fn schema(&self, name: &str) -> Option>; + + /// Adds a new schema to this catalog. + /// If a schema of the same name existed before, it is replaced in the catalog and returned. + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Option>; } /// Simple in-memory implementation of a catalog. @@ -122,17 +130,6 @@ impl MemoryCatalogProvider { schemas: RwLock::new(HashMap::new()), } } - - /// Adds a new schema to this catalog. - /// If a schema of the same name existed before, it is replaced in the catalog and returned. - pub fn register_schema( - &self, - name: impl Into, - schema: Arc, - ) -> Option> { - let mut schemas = self.schemas.write(); - schemas.insert(name.into(), schema) - } } impl CatalogProvider for MemoryCatalogProvider { @@ -149,4 +146,13 @@ impl CatalogProvider for MemoryCatalogProvider { let schemas = self.schemas.read(); schemas.get(name).cloned() } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Option> { + let mut schemas = self.schemas.write(); + schemas.insert(name.into(), schema) + } } diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs index 2fbf82556375..56d350bed453 100644 --- a/datafusion/src/catalog/information_schema.rs +++ b/datafusion/src/catalog/information_schema.rs @@ -84,6 +84,24 @@ impl CatalogProvider for CatalogWithInformationSchema { self.inner.schema(name) } } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Option> { + let catalog_list = self.catalog_list.upgrade(); + match catalog_list { + Some(cl) => { + let catalog = cl.catalog(name); + match catalog { + Some(c) => c.register_schema(name, schema), + None => None, + } + } + None => None, + } + } } /// Implements the `information_schema` virtual schema and tables diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs index a97590af216e..dd5c93c57791 100644 --- a/datafusion/src/catalog/schema.rs +++ b/datafusion/src/catalog/schema.rs @@ -245,6 +245,7 @@ mod tests { use arrow::datatypes::Schema; use crate::assert_batches_eq; + use crate::catalog::catalog::CatalogProvider; use crate::catalog::catalog::MemoryCatalogProvider; use crate::catalog::schema::{ MemorySchemaProvider, ObjectStoreSchemaProvider, SchemaProvider, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 49644c11bb6b..90f73bada1c3 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -59,8 +59,8 @@ use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ - CreateExternalTable, CreateMemoryTable, DropTable, FunctionRegistry, LogicalPlan, - LogicalPlanBuilder, UNNAMED_TABLE, + CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable, + FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE, }; use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use crate::optimizer::filter_push_down::FilterPushDown; @@ -165,7 +165,7 @@ impl ExecutionContext { let default_catalog = MemoryCatalogProvider::new(); default_catalog.register_schema( - config.default_schema.clone(), + config.default_schema.as_str(), Arc::new(MemorySchemaProvider::new()), ); @@ -286,6 +286,34 @@ impl ExecutionContext { Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } } + LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { + schema_name, + if_not_exists, + .. + }) => { + // sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA + // so for now, we default to "public" schema + let catalog = self.catalog("public"); + match catalog { + Some(c) => { + if if_not_exists {} + if let Some(s) = c.schema(&schema_name) { + Err(DataFusionError::Execution(format!( + "Schema {:?} already exists", + schema_name + ))) + }; + + let schema = Arc::new(MemorySchemaProvider::new()); + c.register_schema(&schema_name, schema); + let plan = LogicalPlanBuilder::empty(false).build()?; + Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) + } + None => Err(DataFusionError::Execution(String::from( + "'public' catalog does not exist", + ))), + } + } plan => Ok(Arc::new(DataFrame::new( self.state.clone(), diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index dad63c3a2713..1c0cb7310306 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -221,6 +221,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { // apply the optimization to all inputs of the plan diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 7debb7afca99..0a0e1c025553 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -442,6 +442,7 @@ fn optimize_plan( | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) + | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::DropTable(_) | LogicalPlan::CrossJoin(_) | LogicalPlan::Extension { .. } => { diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 68e25cd205a9..121f8767441a 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -267,7 +267,8 @@ pub fn from_plan( LogicalPlan::EmptyRelation(_) | LogicalPlan::TableScan { .. } | LogicalPlan::CreateExternalTable(_) - | LogicalPlan::DropTable(_) => { + | LogicalPlan::DropTable(_) + | LogicalPlan::CreateCatalogSchema(_) => { // All of these plan types have no inputs / exprs so should not be called assert!(expr.is_empty(), "{:?} should have no exprs", plan); assert!(inputs.is_empty(), "{:?} should have no inputs", plan); diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index b3bcf37da6e0..9f836bc5cd33 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -804,6 +804,15 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: CreateExternalTable".to_string(), )) } + LogicalPlan::CreateCatalogSchema(_) => { + // There is no default plan for "CREATE SCHEMA". + // It must be handled at a higher level (so + // that the schema can be registered with + // the context) + Err(DataFusionError::Internal( + "Unsupported logical plan: CreateCatalogSchema".to_string(), + )) + } | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable (_) => { // Create a dummy exec. Ok(Arc::new(EmptyExec::new( From 0f0d150b1545687223205e298d22dea306a6bb2a Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 10 Mar 2022 15:14:48 -0500 Subject: [PATCH 3/7] Working on catalog and schema registration within ExecutionContext --- datafusion/src/catalog/mod.rs | 16 ++++- datafusion/src/execution/context.rs | 76 ++++++++++++++++------ datafusion/tests/sql/information_schema.rs | 2 +- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/datafusion/src/catalog/mod.rs b/datafusion/src/catalog/mod.rs index 478cdefc0cb7..9b626ced8bc3 100644 --- a/datafusion/src/catalog/mod.rs +++ b/datafusion/src/catalog/mod.rs @@ -105,7 +105,21 @@ impl<'a> TableReference<'a> { impl<'a> From<&'a str> for TableReference<'a> { fn from(s: &'a str) -> Self { - Self::Bare { table: s } + let parts: Vec<&str> = s.split(".").collect(); + + match parts.len() { + 1 => Self::Bare { table: s }, + 2 => Self::Partial { + schema: parts[0], + table: parts[1], + }, + 3 => Self::Full { + catalog: parts[0], + schema: parts[1], + table: parts[2], + }, + _ => Self::Bare { table: s }, + } } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 90f73bada1c3..d87d7cc18c19 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -250,7 +250,6 @@ impl ExecutionContext { } else { Some(Arc::new(schema.as_ref().to_owned().into())) }; - self.register_listing_table(name, location, options, provided_schema) .await?; let plan = LogicalPlanBuilder::empty(false).build()?; @@ -292,25 +291,52 @@ impl ExecutionContext { .. }) => { // sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA - // so for now, we default to "public" schema - let catalog = self.catalog("public"); - match catalog { - Some(c) => { - if if_not_exists {} - if let Some(s) = c.schema(&schema_name) { - Err(DataFusionError::Execution(format!( - "Schema {:?} already exists", - schema_name - ))) - }; + // so for now, we default to "datafusion" catalog + let default_catalog = "datafusion"; + let catalog = self.catalog(default_catalog).ok_or_else(|| { + DataFusionError::Execution(String::from( + "Missing 'datafusion' catalog", + )) + })?; + + let schema = catalog.schema(&schema_name); + match (if_not_exists, schema) { + // + (true, Some(_)) => { + println!("Schema '{:?}' already exists", &schema_name); + let plan = LogicalPlanBuilder::empty(false).build()?; + Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) + } + (true, None) | (false, None) => { + println!("Creating schema {:?}", schema_name); let schema = Arc::new(MemorySchemaProvider::new()); - c.register_schema(&schema_name, schema); + let plan = LogicalPlanBuilder::empty(false).build()?; + schema.register_table( + "test".into(), + Arc::new(DataFrameImpl::new(self.state.clone(), &plan)), + )?; + let schem_reg_res = catalog.register_schema(&schema_name, schema); + match schem_reg_res { + Some(_) => { + println!("Existing schema with name") + } + None => { + println!("Succesfully registerd") + } + }; + // println!("Schemas pre reg: {:?}", catalog.schema_names); + self.register_catalog(default_catalog, catalog); + println!( + "Schema names: {:?}", + self.catalog(default_catalog).unwrap().schema_names() + ); let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } - None => Err(DataFusionError::Execution(String::from( - "'public' catalog does not exist", + (false, Some(_)) => Err(DataFusionError::Execution(format!( + "Schema '{:?}' already exists", + schema_name ))), } } @@ -563,10 +589,18 @@ impl ExecutionContext { let state = self.state.lock(); let catalog = if state.config.information_schema { - Arc::new(CatalogWithInformationSchema::new( - Arc::downgrade(&state.catalog_list), - catalog, - )) + let is = state + .catalog_list + .catalog("datafusion") + .unwrap() + .schema("information_schema"); + match is { + Some(_) => catalog, + None => Arc::new(CatalogWithInformationSchema::new( + Arc::downgrade(&state.catalog_list), + catalog, + )), + } } else { catalog }; @@ -1184,6 +1218,10 @@ impl ExecutionContextState { table_ref: impl Into>, ) -> Result> { let resolved_ref = self.resolve_table_ref(table_ref.into()); + println!( + "Resolved ref: {:?}:{:?}:{:?}", + resolved_ref.catalog, resolved_ref.schema, resolved_ref.table + ); self.catalog_list .catalog(resolved_ref.catalog) diff --git a/datafusion/tests/sql/information_schema.rs b/datafusion/tests/sql/information_schema.rs index d93f0d7328d3..7b4558f63faf 100644 --- a/datafusion/tests/sql/information_schema.rs +++ b/datafusion/tests/sql/information_schema.rs @@ -18,7 +18,7 @@ use async_trait::async_trait; use datafusion::{ catalog::{ - catalog::MemoryCatalogProvider, + catalog::{CatalogProvider, MemoryCatalogProvider}, schema::{MemorySchemaProvider, SchemaProvider}, }, datasource::{TableProvider, TableType}, From 73d01d51ddbf4d125b1b3be15d9a023abf9e8997 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 14 Mar 2022 12:07:04 -0400 Subject: [PATCH 4/7] Create schema SQL working and added test --- datafusion/src/catalog/information_schema.rs | 13 +---- datafusion/src/catalog/mod.rs | 2 +- datafusion/src/execution/context.rs | 51 +++++++++----------- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs index 56d350bed453..4a899c91579d 100644 --- a/datafusion/src/catalog/information_schema.rs +++ b/datafusion/src/catalog/information_schema.rs @@ -90,17 +90,8 @@ impl CatalogProvider for CatalogWithInformationSchema { name: &str, schema: Arc, ) -> Option> { - let catalog_list = self.catalog_list.upgrade(); - match catalog_list { - Some(cl) => { - let catalog = cl.catalog(name); - match catalog { - Some(c) => c.register_schema(name, schema), - None => None, - } - } - None => None, - } + let catalog = &self.inner; + catalog.register_schema(name, schema) } } diff --git a/datafusion/src/catalog/mod.rs b/datafusion/src/catalog/mod.rs index 9b626ced8bc3..031cd871043f 100644 --- a/datafusion/src/catalog/mod.rs +++ b/datafusion/src/catalog/mod.rs @@ -105,7 +105,7 @@ impl<'a> TableReference<'a> { impl<'a> From<&'a str> for TableReference<'a> { fn from(s: &'a str) -> Self { - let parts: Vec<&str> = s.split(".").collect(); + let parts: Vec<&str> = s.split('.').collect(); match parts.len() { 1 => Self::Bare { table: s }, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index d87d7cc18c19..8a2b372a2699 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -302,35 +302,13 @@ impl ExecutionContext { let schema = catalog.schema(&schema_name); match (if_not_exists, schema) { - // (true, Some(_)) => { - println!("Schema '{:?}' already exists", &schema_name); let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } (true, None) | (false, None) => { - println!("Creating schema {:?}", schema_name); let schema = Arc::new(MemorySchemaProvider::new()); - let plan = LogicalPlanBuilder::empty(false).build()?; - schema.register_table( - "test".into(), - Arc::new(DataFrameImpl::new(self.state.clone(), &plan)), - )?; - let schem_reg_res = catalog.register_schema(&schema_name, schema); - match schem_reg_res { - Some(_) => { - println!("Existing schema with name") - } - None => { - println!("Succesfully registerd") - } - }; - // println!("Schemas pre reg: {:?}", catalog.schema_names); - self.register_catalog(default_catalog, catalog); - println!( - "Schema names: {:?}", - self.catalog(default_catalog).unwrap().schema_names() - ); + catalog.register_schema(&schema_name, schema); let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } @@ -1218,10 +1196,6 @@ impl ExecutionContextState { table_ref: impl Into>, ) -> Result> { let resolved_ref = self.resolve_table_ref(table_ref.into()); - println!( - "Resolved ref: {:?}:{:?}:{:?}", - resolved_ref.catalog, resolved_ref.schema, resolved_ref.table - ); self.catalog_list .catalog(resolved_ref.catalog) @@ -2973,6 +2947,29 @@ mod tests { assert_eq!(Weak::strong_count(&catalog_weak), 0); } + #[tokio::test] + async fn sql_create_schema() -> Result<()> { + // the information schema used to introduce cyclic Arcs + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(true), + ); + + // Create schema + ctx.sql("CREATE SCHEMA abc").await?.collect().await?; + + // Add table to schema + ctx.sql("CREATE TABLE abc.y AS VALUES (1,2,3)") + .await? + .collect() + .await?; + + // Check table exists in schema + let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap(); + + assert_eq!(results[0].num_rows(), 1); + Ok(()) + } + #[tokio::test] async fn normalized_column_identifiers() { // create local execution context From bc655170ac374863b16858f3bfbb493fc61e4e0d Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 14 Mar 2022 12:49:08 -0400 Subject: [PATCH 5/7] Fix tests --- datafusion/src/logical_plan/plan.rs | 6 ++++-- datafusion/src/sql/planner.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 3f6b2440c8aa..3485977ea6c0 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -1028,8 +1028,10 @@ impl LogicalPlan { }) => { write!(f, "CreateCatalogSchema: {:?}", schema_name) } - LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => { - write!(f, "DropTable: {:?} if not exist:={}", name, if_exist) + LogicalPlan::DropTable(DropTable { + name, if_exists, .. + }) => { + write!(f, "DropTable: {:?} if not exist:={}", name, if_exists) } LogicalPlan::Explain { .. } => write!(f, "Explain"), LogicalPlan::Analyze { .. } => write!(f, "Analyze"), diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 2b21ee3cdb92..61b03b20bd32 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -177,7 +177,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if_not_exists, } => Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema_name: schema_name.to_string(), - if_not_exists: *if_not_exists, + if_not_exists, schema: Arc::new(DFSchema::empty()), })), Statement::Drop { From 877ccd6700fea51583e6d2291a7232efc5f2d0c7 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 15 Mar 2022 09:48:21 -0400 Subject: [PATCH 6/7] Ballista updates --- ballista/rust/core/proto/ballista.proto | 7 +++++ .../rust/core/src/serde/logical_plan/mod.rs | 30 +++++++++++++++++-- datafusion/src/logical_plan/plan.rs | 2 +- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index a835229a6057..5bb12890ccc8 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -49,6 +49,7 @@ message LogicalPlanNode { CrossJoinNode cross_join = 15; ValuesNode values = 16; LogicalExtensionNode extension = 17; + CreateCatalogSchemaNode create_catalog_schema = 18; } } @@ -146,6 +147,12 @@ message CreateExternalTableNode { datafusion.DfSchema schema = 5; } +message CreateCatalogSchemaNode { + string schema_name = 1; + bool if_not_exists = 2; + datafusion.DfSchema schema = 3; +} + // a node containing data for defining values list. unlike in SQL where it's two dimensional, here // the list is flattened, and with the field n_cols it can be parsed and partitioned into rows message ValuesNode { diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 4970cd600a5a..eb61f65649e6 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window, }; use datafusion::logical_plan::{ - Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, - LogicalPlanBuilder, Repartition, TableScan, Values, + Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint, + Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values, }; use datafusion::prelude::ExecutionContext; @@ -322,6 +322,19 @@ impl AsLogicalPlan for LogicalPlanNode { has_header: create_extern_table.has_header, })) } + LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { + let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| { + BallistaError::General(String::from( + "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.", + )) + })?; + + Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { + schema_name: create_catalog_schema.schema_name.clone(), + if_not_exists: create_catalog_schema.if_not_exists, + schema: pb_schema.try_into()?, + })) + } LogicalPlanType::Analyze(analyze) => { let input: LogicalPlan = into_logical_plan!(analyze.input, &ctx, extension_codec)?; @@ -755,6 +768,19 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } + LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { + schema_name, + if_not_exists, + schema: df_schema, + }) => Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema( + protobuf::CreateCatalogSchemaNode { + schema_name: schema_name.clone(), + if_not_exists: *if_not_exists, + schema: Some(df_schema.into()), + }, + )), + }), LogicalPlan::Analyze(a) => { let input = protobuf::LogicalPlanNode::try_from_logical_plan( a.input.as_ref(), diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 3485977ea6c0..5fc91b6e1a05 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -357,7 +357,7 @@ pub enum LogicalPlan { CreateExternalTable(CreateExternalTable), /// Creates an in memory table. CreateMemoryTable(CreateMemoryTable), - /// Creates an in memory table. + /// Creates a new catalog schema. CreateCatalogSchema(CreateCatalogSchema), /// Drops a table. DropTable(DropTable), From 71d5a4633c08e751d3da279c7c15aef0ce1184af Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 15 Mar 2022 11:57:21 -0400 Subject: [PATCH 7/7] Rebase remainder --- datafusion/src/execution/context.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 8a2b372a2699..381a238ef255 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -304,13 +304,13 @@ impl ExecutionContext { match (if_not_exists, schema) { (true, Some(_)) => { let plan = LogicalPlanBuilder::empty(false).build()?; - Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) + Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } (true, None) | (false, None) => { let schema = Arc::new(MemorySchemaProvider::new()); catalog.register_schema(&schema_name, schema); let plan = LogicalPlanBuilder::empty(false).build()?; - Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) + Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } (false, Some(_)) => Err(DataFusionError::Execution(format!( "Schema '{:?}' already exists",