Skip to content

Commit 30cd04c

Browse files
Working on catalog and schema registration within ExecutionContext
1 parent a6b588f commit 30cd04c

File tree

3 files changed

+73
-21
lines changed

3 files changed

+73
-21
lines changed

datafusion/src/catalog/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,21 @@ impl<'a> TableReference<'a> {
105105

106106
impl<'a> From<&'a str> for TableReference<'a> {
107107
fn from(s: &'a str) -> Self {
108-
Self::Bare { table: s }
108+
let parts: Vec<&str> = s.split(".").collect();
109+
110+
match parts.len() {
111+
1 => Self::Bare { table: s },
112+
2 => Self::Partial {
113+
schema: parts[0],
114+
table: parts[1],
115+
},
116+
3 => Self::Full {
117+
catalog: parts[0],
118+
schema: parts[1],
119+
table: parts[2],
120+
},
121+
_ => Self::Bare { table: s },
122+
}
109123
}
110124
}
111125

datafusion/src/execution/context.rs

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ impl ExecutionContext {
249249
} else {
250250
Some(Arc::new(schema.as_ref().to_owned().into()))
251251
};
252-
253252
self.register_listing_table(name, location, options, provided_schema)
254253
.await?;
255254
let plan = LogicalPlanBuilder::empty(false).build()?;
@@ -289,25 +288,52 @@ impl ExecutionContext {
289288
..
290289
}) => {
291290
// sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA
292-
// so for now, we default to "public" schema
293-
let catalog = self.catalog("public");
294-
match catalog {
295-
Some(c) => {
296-
if if_not_exists {}
297-
if let Some(s) = c.schema(&schema_name) {
298-
Err(DataFusionError::Execution(format!(
299-
"Schema {:?} already exists",
300-
schema_name
301-
)))
302-
};
291+
// so for now, we default to "datafusion" catalog
292+
let default_catalog = "datafusion";
293+
let catalog = self.catalog(default_catalog).ok_or_else(|| {
294+
DataFusionError::Execution(String::from(
295+
"Missing 'datafusion' catalog",
296+
))
297+
})?;
298+
299+
let schema = catalog.schema(&schema_name);
303300

301+
match (if_not_exists, schema) {
302+
//
303+
(true, Some(_)) => {
304+
println!("Schema '{:?}' already exists", &schema_name);
305+
let plan = LogicalPlanBuilder::empty(false).build()?;
306+
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
307+
}
308+
(true, None) | (false, None) => {
309+
println!("Creating schema {:?}", schema_name);
304310
let schema = Arc::new(MemorySchemaProvider::new());
305-
c.register_schema(&schema_name, schema);
311+
let plan = LogicalPlanBuilder::empty(false).build()?;
312+
schema.register_table(
313+
"test".into(),
314+
Arc::new(DataFrameImpl::new(self.state.clone(), &plan)),
315+
)?;
316+
let schem_reg_res = catalog.register_schema(&schema_name, schema);
317+
match schem_reg_res {
318+
Some(_) => {
319+
println!("Existing schema with name")
320+
}
321+
None => {
322+
println!("Succesfully registerd")
323+
}
324+
};
325+
// println!("Schemas pre reg: {:?}", catalog.schema_names);
326+
self.register_catalog(default_catalog, catalog);
327+
println!(
328+
"Schema names: {:?}",
329+
self.catalog(default_catalog).unwrap().schema_names()
330+
);
306331
let plan = LogicalPlanBuilder::empty(false).build()?;
307332
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
308333
}
309-
None => Err(DataFusionError::Execution(String::from(
310-
"'public' catalog does not exist",
334+
(false, Some(_)) => Err(DataFusionError::Execution(format!(
335+
"Schema '{:?}' already exists",
336+
schema_name
311337
))),
312338
}
313339
}
@@ -563,10 +589,18 @@ impl ExecutionContext {
563589

564590
let state = self.state.lock();
565591
let catalog = if state.config.information_schema {
566-
Arc::new(CatalogWithInformationSchema::new(
567-
Arc::downgrade(&state.catalog_list),
568-
catalog,
569-
))
592+
let is = state
593+
.catalog_list
594+
.catalog("datafusion")
595+
.unwrap()
596+
.schema("information_schema");
597+
match is {
598+
Some(_) => catalog,
599+
None => Arc::new(CatalogWithInformationSchema::new(
600+
Arc::downgrade(&state.catalog_list),
601+
catalog,
602+
)),
603+
}
570604
} else {
571605
catalog
572606
};
@@ -1183,6 +1217,10 @@ impl ExecutionContextState {
11831217
table_ref: impl Into<TableReference<'a>>,
11841218
) -> Result<Arc<dyn SchemaProvider>> {
11851219
let resolved_ref = self.resolve_table_ref(table_ref.into());
1220+
println!(
1221+
"Resolved ref: {:?}:{:?}:{:?}",
1222+
resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
1223+
);
11861224

11871225
self.catalog_list
11881226
.catalog(resolved_ref.catalog)

datafusion/tests/sql/information_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use async_trait::async_trait;
1919
use datafusion::{
2020
catalog::{
21-
catalog::MemoryCatalogProvider,
21+
catalog::{CatalogProvider, MemoryCatalogProvider},
2222
schema::{MemorySchemaProvider, SchemaProvider},
2323
},
2424
datasource::{TableProvider, TableType},

0 commit comments

Comments
 (0)