Skip to content

Add Create Schema functionality in SQL #1959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message LogicalPlanNode {
CrossJoinNode cross_join = 15;
ValuesNode values = 16;
LogicalExtensionNode extension = 17;
CreateCatalogSchemaNode create_catalog_schema = 18;
}
}

Expand Down Expand Up @@ -146,6 +147,12 @@ message CreateExternalTableNode {
datafusion.DfSchema schema = 5;
}

message CreateCatalogSchemaNode {
string schema_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
Expand Down
30 changes: 28 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
};
use datafusion::logical_plan::{
Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
LogicalPlanBuilder, Repartition, TableScan, Values,
Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint,
Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
};
use datafusion::prelude::ExecutionContext;

Expand Down Expand Up @@ -322,6 +322,19 @@ impl AsLogicalPlan for LogicalPlanNode {
has_header: create_extern_table.has_header,
}))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
BallistaError::General(String::from(
"Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
))
})?;

Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name: create_catalog_schema.schema_name.clone(),
if_not_exists: create_catalog_schema.if_not_exists,
schema: pb_schema.try_into()?,
}))
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan =
into_logical_plan!(analyze.input, &ctx, extension_codec)?;
Expand Down Expand Up @@ -755,6 +768,19 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
}
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
if_not_exists,
schema: df_schema,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
protobuf::CreateCatalogSchemaNode {
schema_name: schema_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
},
)),
}),
LogicalPlan::Analyze(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
Expand Down
28 changes: 17 additions & 11 deletions datafusion/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ pub trait CatalogProvider: Sync + Send {

/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;

/// Adds a new schema to this catalog.
/// If a schema of the same name existed before, it is replaced in the catalog and returned.
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>>;
}

/// Simple in-memory implementation of a catalog.
Expand All @@ -122,17 +130,6 @@ impl MemoryCatalogProvider {
schemas: RwLock::new(HashMap::new()),
}
}

/// Adds a new schema to this catalog.
/// If a schema of the same name existed before, it is replaced in the catalog and returned.
pub fn register_schema(
&self,
name: impl Into<String>,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
let mut schemas = self.schemas.write();
schemas.insert(name.into(), schema)
}
}

impl CatalogProvider for MemoryCatalogProvider {
Expand All @@ -149,4 +146,13 @@ impl CatalogProvider for MemoryCatalogProvider {
let schemas = self.schemas.read();
schemas.get(name).cloned()
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
let mut schemas = self.schemas.write();
schemas.insert(name.into(), schema)
}
}
9 changes: 9 additions & 0 deletions datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ impl CatalogProvider for CatalogWithInformationSchema {
self.inner.schema(name)
}
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
let catalog = &self.inner;
catalog.register_schema(name, schema)
}
}

/// Implements the `information_schema` virtual schema and tables
Expand Down
16 changes: 15 additions & 1 deletion datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,21 @@ impl<'a> TableReference<'a> {

impl<'a> From<&'a str> for TableReference<'a> {
fn from(s: &'a str) -> Self {
Self::Bare { table: s }
let parts: Vec<&str> = s.split('.').collect();

match parts.len() {
1 => Self::Bare { table: s },
2 => Self::Partial {
schema: parts[0],
table: parts[1],
},
3 => Self::Full {
catalog: parts[0],
schema: parts[1],
table: parts[2],
},
_ => Self::Bare { table: s },
}
}
Copy link
Contributor

@doki23 doki23 Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻! It's helpful to me!

}

Expand Down
1 change: 1 addition & 0 deletions datafusion/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ mod tests {
use arrow::datatypes::Schema;

use crate::assert_batches_eq;
use crate::catalog::catalog::CatalogProvider;
use crate::catalog::catalog::MemoryCatalogProvider;
use crate::catalog::schema::{
MemorySchemaProvider, ObjectStoreSchemaProvider, SchemaProvider,
Expand Down
79 changes: 71 additions & 8 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
CreateExternalTable, CreateMemoryTable, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable,
FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
Expand Down Expand Up @@ -165,7 +165,7 @@ impl ExecutionContext {
let default_catalog = MemoryCatalogProvider::new();

default_catalog.register_schema(
config.default_schema.clone(),
config.default_schema.as_str(),
Arc::new(MemorySchemaProvider::new()),
);

Expand Down Expand Up @@ -250,7 +250,6 @@ impl ExecutionContext {
} else {
Some(Arc::new(schema.as_ref().to_owned().into()))
};

self.register_listing_table(name, location, options, provided_schema)
.await?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Expand Down Expand Up @@ -286,6 +285,39 @@ impl ExecutionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
}
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
if_not_exists,
..
}) => {
// sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth a ticket to sqlparser to support this? Or maybe just a PR :)

// so for now, we default to "datafusion" catalog
let default_catalog = "datafusion";
let catalog = self.catalog(default_catalog).ok_or_else(|| {
DataFusionError::Execution(String::from(
"Missing 'datafusion' catalog",
))
})?;

let schema = catalog.schema(&schema_name);

match (if_not_exists, schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

(true, Some(_)) => {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(true, None) | (false, None) => {
let schema = Arc::new(MemorySchemaProvider::new());
catalog.register_schema(&schema_name, schema);
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(false, Some(_)) => Err(DataFusionError::Execution(format!(
"Schema '{:?}' already exists",
schema_name
))),
}
}

plan => Ok(Arc::new(DataFrame::new(
self.state.clone(),
Expand Down Expand Up @@ -535,10 +567,18 @@ impl ExecutionContext {

let state = self.state.lock();
let catalog = if state.config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
catalog,
))
let is = state
.catalog_list
.catalog("datafusion")
.unwrap()
.schema("information_schema");
match is {
Some(_) => catalog,
None => Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
catalog,
)),
}
} else {
catalog
};
Expand Down Expand Up @@ -2907,6 +2947,29 @@ mod tests {
assert_eq!(Weak::strong_count(&catalog_weak), 0);
}

#[tokio::test]
async fn sql_create_schema() -> Result<()> {
// the information schema used to introduce cyclic Arcs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what this comment means

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops that may have been straggling comment from a copy paste, sry about that. i can remove in subsequent PR

let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(true),
);

// Create schema
ctx.sql("CREATE SCHEMA abc").await?.collect().await?;

// Add table to schema
ctx.sql("CREATE TABLE abc.y AS VALUES (1,2,3)")
.await?
.collect()
.await?;

// Check table exists in schema
let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap();

assert_eq!(results[0].num_rows(), 1);
Ok(())
}

#[tokio::test]
async fn normalized_column_identifiers() {
// create local execution context
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor,
Repartition, TableScan, Union, Values,
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
PlanVisitor, Repartition, TableScan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
28 changes: 27 additions & 1 deletion datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ pub struct CreateExternalTable {
pub has_header: bool,
}

/// Creates a schema.
#[derive(Clone)]
pub struct CreateCatalogSchema {
/// The table schema
pub schema_name: String,
/// The table name
pub if_not_exists: bool,
/// Empty schema
pub schema: DFSchemaRef,
}

/// Drops a table.
#[derive(Clone)]
pub struct DropTable {
Expand Down Expand Up @@ -346,6 +357,8 @@ pub enum LogicalPlan {
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
CreateMemoryTable(CreateMemoryTable),
/// Creates a new catalog schema.
CreateCatalogSchema(CreateCatalogSchema),
/// Drops a table.
DropTable(DropTable),
/// Values expression. See
Expand Down Expand Up @@ -390,6 +403,9 @@ impl LogicalPlan {
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
input.schema()
}
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
schema
}
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
}
}
Expand Down Expand Up @@ -431,7 +447,8 @@ impl LogicalPlan {
LogicalPlan::Explain(Explain { schema, .. })
| LogicalPlan::Analyze(Analyze { schema, .. })
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
| LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
vec![schema]
}
LogicalPlan::Limit(Limit { input, .. })
Expand Down Expand Up @@ -486,6 +503,7 @@ impl LogicalPlan {
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
Expand Down Expand Up @@ -521,6 +539,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_) => vec![],
}
}
Expand Down Expand Up @@ -673,6 +692,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Values(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_) => true,
};
if !recurse {
Expand Down Expand Up @@ -1002,6 +1022,12 @@ impl LogicalPlan {
}) => {
write!(f, "CreateMemoryTable: {:?}", name)
}
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
..
}) => {
write!(f, "CreateCatalogSchema: {:?}", schema_name)
}
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Extension { .. } => {
Expand Down
Loading