Skip to content

Commit ff3aabb

Browse files
Create schema SQL working and added test
1 parent 851b2ef commit ff3aabb

3 files changed

Lines changed: 27 additions & 39 deletions

File tree

datafusion/src/catalog/information_schema.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,8 @@ impl CatalogProvider for CatalogWithInformationSchema {
9090
name: &str,
9191
schema: Arc<dyn SchemaProvider>,
9292
) -> Option<Arc<dyn SchemaProvider>> {
93-
let catalog_list = self.catalog_list.upgrade();
94-
match catalog_list {
95-
Some(cl) => {
96-
let catalog = cl.catalog(name);
97-
match catalog {
98-
Some(c) => c.register_schema(name, schema),
99-
None => None,
100-
}
101-
}
102-
None => None,
103-
}
93+
let catalog = &self.inner;
94+
catalog.register_schema(name, schema)
10495
}
10596
}
10697

datafusion/src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl<'a> TableReference<'a> {
105105

106106
impl<'a> From<&'a str> for TableReference<'a> {
107107
fn from(s: &'a str) -> Self {
108-
let parts: Vec<&str> = s.split(".").collect();
108+
let parts: Vec<&str> = s.split('.').collect();
109109

110110
match parts.len() {
111111
1 => Self::Bare { table: s },

datafusion/src/execution/context.rs

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -302,35 +302,13 @@ impl ExecutionContext {
302302
let schema = catalog.schema(&schema_name);
303303

304304
match (if_not_exists, schema) {
305-
//
306305
(true, Some(_)) => {
307-
println!("Schema '{:?}' already exists", &schema_name);
308306
let plan = LogicalPlanBuilder::empty(false).build()?;
309307
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
310308
}
311309
(true, None) | (false, None) => {
312-
println!("Creating schema {:?}", schema_name);
313310
let schema = Arc::new(MemorySchemaProvider::new());
314-
let plan = LogicalPlanBuilder::empty(false).build()?;
315-
schema.register_table(
316-
"test".into(),
317-
Arc::new(DataFrameImpl::new(self.state.clone(), &plan)),
318-
)?;
319-
let schem_reg_res = catalog.register_schema(&schema_name, schema);
320-
match schem_reg_res {
321-
Some(_) => {
322-
println!("Existing schema with name")
323-
}
324-
None => {
325-
println!("Succesfully registerd")
326-
}
327-
};
328-
// println!("Schemas pre reg: {:?}", catalog.schema_names);
329-
self.register_catalog(default_catalog, catalog);
330-
println!(
331-
"Schema names: {:?}",
332-
self.catalog(default_catalog).unwrap().schema_names()
333-
);
311+
catalog.register_schema(&schema_name, schema);
334312
let plan = LogicalPlanBuilder::empty(false).build()?;
335313
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
336314
}
@@ -1221,10 +1199,6 @@ impl ExecutionContextState {
12211199
table_ref: impl Into<TableReference<'a>>,
12221200
) -> Result<Arc<dyn SchemaProvider>> {
12231201
let resolved_ref = self.resolve_table_ref(table_ref.into());
1224-
println!(
1225-
"Resolved ref: {:?}:{:?}:{:?}",
1226-
resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
1227-
);
12281202

12291203
self.catalog_list
12301204
.catalog(resolved_ref.catalog)
@@ -2976,6 +2950,29 @@ mod tests {
29762950
assert_eq!(Weak::strong_count(&catalog_weak), 0);
29772951
}
29782952

2953+
#[tokio::test]
2954+
async fn sql_create_schema() -> Result<()> {
2955+
// the information schema used to introduce cyclic Arcs
2956+
let mut ctx = ExecutionContext::with_config(
2957+
ExecutionConfig::new().with_information_schema(true),
2958+
);
2959+
2960+
// Create schema
2961+
ctx.sql("CREATE SCHEMA abc").await?.collect().await?;
2962+
2963+
// Add table to schema
2964+
ctx.sql("CREATE TABLE abc.y AS VALUES (1,2,3)")
2965+
.await?
2966+
.collect()
2967+
.await?;
2968+
2969+
// Check table exists in schema
2970+
let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap();
2971+
2972+
assert_eq!(results[0].num_rows(), 1);
2973+
Ok(())
2974+
}
2975+
29792976
#[tokio::test]
29802977
async fn normalized_column_identifiers() {
29812978
// create local execution context

0 commit comments

Comments
 (0)