Skip to content

Commit 4e44f4d

Browse files
Create schema SQL working and added test
1 parent 30cd04c commit 4e44f4d

File tree

3 files changed

+27
-39
lines changed

3 files changed

+27
-39
lines changed

datafusion/src/catalog/information_schema.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,8 @@ impl CatalogProvider for CatalogWithInformationSchema {
9090
name: &str,
9191
schema: Arc<dyn SchemaProvider>,
9292
) -> Option<Arc<dyn SchemaProvider>> {
93-
let catalog_list = self.catalog_list.upgrade();
94-
match catalog_list {
95-
Some(cl) => {
96-
let catalog = cl.catalog(name);
97-
match catalog {
98-
Some(c) => c.register_schema(name, schema),
99-
None => None,
100-
}
101-
}
102-
None => None,
103-
}
93+
let catalog = &self.inner;
94+
catalog.register_schema(name, schema)
10495
}
10596
}
10697

datafusion/src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl<'a> TableReference<'a> {
105105

106106
impl<'a> From<&'a str> for TableReference<'a> {
107107
fn from(s: &'a str) -> Self {
108-
let parts: Vec<&str> = s.split(".").collect();
108+
let parts: Vec<&str> = s.split('.').collect();
109109

110110
match parts.len() {
111111
1 => Self::Bare { table: s },

datafusion/src/execution/context.rs

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -299,35 +299,13 @@ impl ExecutionContext {
299299
let schema = catalog.schema(&schema_name);
300300

301301
match (if_not_exists, schema) {
302-
//
303302
(true, Some(_)) => {
304-
println!("Schema '{:?}' already exists", &schema_name);
305303
let plan = LogicalPlanBuilder::empty(false).build()?;
306304
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
307305
}
308306
(true, None) | (false, None) => {
309-
println!("Creating schema {:?}", schema_name);
310307
let schema = Arc::new(MemorySchemaProvider::new());
311-
let plan = LogicalPlanBuilder::empty(false).build()?;
312-
schema.register_table(
313-
"test".into(),
314-
Arc::new(DataFrameImpl::new(self.state.clone(), &plan)),
315-
)?;
316-
let schem_reg_res = catalog.register_schema(&schema_name, schema);
317-
match schem_reg_res {
318-
Some(_) => {
319-
println!("Existing schema with name")
320-
}
321-
None => {
322-
println!("Succesfully registerd")
323-
}
324-
};
325-
// println!("Schemas pre reg: {:?}", catalog.schema_names);
326-
self.register_catalog(default_catalog, catalog);
327-
println!(
328-
"Schema names: {:?}",
329-
self.catalog(default_catalog).unwrap().schema_names()
330-
);
308+
catalog.register_schema(&schema_name, schema);
331309
let plan = LogicalPlanBuilder::empty(false).build()?;
332310
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
333311
}
@@ -1217,10 +1195,6 @@ impl ExecutionContextState {
12171195
table_ref: impl Into<TableReference<'a>>,
12181196
) -> Result<Arc<dyn SchemaProvider>> {
12191197
let resolved_ref = self.resolve_table_ref(table_ref.into());
1220-
println!(
1221-
"Resolved ref: {:?}:{:?}:{:?}",
1222-
resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
1223-
);
12241198

12251199
self.catalog_list
12261200
.catalog(resolved_ref.catalog)
@@ -2954,6 +2928,29 @@ mod tests {
29542928
assert_eq!(Weak::strong_count(&catalog_weak), 0);
29552929
}
29562930

2931+
#[tokio::test]
2932+
async fn sql_create_schema() -> Result<()> {
2933+
// the information schema used to introduce cyclic Arcs
2934+
let mut ctx = ExecutionContext::with_config(
2935+
ExecutionConfig::new().with_information_schema(true),
2936+
);
2937+
2938+
// Create schema
2939+
ctx.sql("CREATE SCHEMA abc").await?.collect().await?;
2940+
2941+
// Add table to schema
2942+
ctx.sql("CREATE TABLE abc.y AS VALUES (1,2,3)")
2943+
.await?
2944+
.collect()
2945+
.await?;
2946+
2947+
// Check table exists in schema
2948+
let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap();
2949+
2950+
assert_eq!(results[0].num_rows(), 1);
2951+
Ok(())
2952+
}
2953+
29572954
#[tokio::test]
29582955
async fn normalized_column_identifiers() {
29592956
// create local execution context

0 commit comments

Comments
 (0)