Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ impl DynamicObjectStoreCatalogProvider {
}

impl CatalogProvider for DynamicObjectStoreCatalogProvider {
fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> Result<Vec<String>> {
self.inner.schema_names()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let state = self.state.clone();
self.inner.schema(name).map(|schema| {
Ok(self.inner.schema(name)?.map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
}))
}

fn register_schema(
Expand Down Expand Up @@ -125,7 +125,7 @@ impl DynamicObjectStoreSchemaProvider {

#[async_trait]
impl SchemaProvider for DynamicObjectStoreSchemaProvider {
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> Result<Vec<String>> {
self.inner.table_names()
}

Expand Down Expand Up @@ -200,7 +200,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> Result<bool> {
self.inner.table_exist(name)
}
}
Expand Down Expand Up @@ -235,12 +235,22 @@ mod tests {
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
) as &dyn CatalogProviderList;
let catalog_name = provider
.catalog_names()
.first()
.expect("default catalog should exist")
.clone();
let catalog = provider
.catalog(provider.catalog_names().first().unwrap())
.unwrap();
.catalog(&catalog_name)
.expect("default catalog should be retrievable");
let schema_names = catalog
.schema_names()
.expect("schema names lookup should succeed");
let schema_name = schema_names.first().expect("default schema should exist");
let schema = catalog
.schema(catalog.schema_names().first().unwrap())
.unwrap();
.schema(schema_name)
.expect("schema lookup should succeed")
.expect("default schema should be retrievable");
(ctx, schema)
}

Expand Down
18 changes: 9 additions & 9 deletions datafusion-examples/examples/data_io/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,19 @@ impl DirSchema {

#[async_trait]
impl SchemaProvider for DirSchema {
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> Result<Vec<String>> {
let tables = self.tables.read().unwrap();
tables.keys().cloned().collect::<Vec<_>>()
Ok(tables.keys().cloned().collect::<Vec<_>>())
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let tables = self.tables.read().unwrap();
Ok(tables.get(name).cloned())
}

fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> Result<bool> {
let tables = self.tables.read().unwrap();
tables.contains_key(name)
Ok(tables.contains_key(name))
}

fn register_table(
Expand Down Expand Up @@ -237,20 +237,20 @@ impl CatalogProvider for DirCatalog {
Ok(Some(schema))
}

fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> Result<Vec<String>> {
let schemas = self.schemas.read().unwrap();
schemas.keys().cloned().collect()
Ok(schemas.keys().cloned().collect())
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let schemas = self.schemas.read().unwrap();
let maybe_schema = schemas.get(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small cleanup suggestion: this manual if let Some can be collapsed to the direct lookup result, since the map already stores Arc<dyn SchemaProvider>:

Ok(schemas.get(name).cloned())

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look into both of these tomorrow! Glad you like the changes :)

if let Some(schema) = maybe_schema {
Ok(if let Some(schema) = maybe_schema {
let schema = schema.clone() as Arc<dyn SchemaProvider>;
Some(schema)
} else {
None
}
})
}
}

Expand Down
38 changes: 29 additions & 9 deletions datafusion-examples/examples/flight/sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl FlightSqlServiceImpl {
}
}

async fn tables(&self, ctx: Arc<SessionContext>) -> RecordBatch {
async fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch, Status> {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, true),
Field::new("db_schema_name", DataType::Utf8, true),
Expand All @@ -176,12 +176,32 @@ impl FlightSqlServiceImpl {
let mut names = vec![];
let mut types = vec![];
for catalog in ctx.catalog_names() {
let catalog_provider = ctx.catalog(&catalog).unwrap();
for schema in catalog_provider.schema_names() {
let schema_provider = catalog_provider.schema(&schema).unwrap();
for table in schema_provider.table_names() {
let table_provider =
schema_provider.table(&table).await.unwrap().unwrap();
let catalog_provider = ctx.catalog(&catalog).ok_or_else(|| {
Status::internal(format!("Catalog not found: {catalog}"))
})?;
for schema in catalog_provider.schema_names().map_err(|e| {
Status::internal(format!("Error looking up schema names: {e}"))
})? {
let schema_provider = catalog_provider
.schema(&schema)
.map_err(|e| {
Status::internal(format!("Error looking up schema: {e}"))
})?
.ok_or_else(|| {
Status::internal(format!("Schema not found: {schema}"))
})?;
for table in schema_provider.table_names().map_err(|e| {
Status::internal(format!("Error looking up table names: {e}"))
})? {
let table_provider = schema_provider
.table(&table)
.await
.map_err(|e| {
Status::internal(format!("Error looking up table: {e}"))
})?
.ok_or_else(|| {
Status::internal(format!("Table not found: {table}"))
})?;
catalogs.push(catalog.clone());
schemas.push(schema.clone());
names.push(table.clone());
Expand All @@ -197,7 +217,7 @@ impl FlightSqlServiceImpl {
.map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
.collect::<Vec<_>>(),
)
.unwrap()
.map_err(|e| Status::internal(format!("Error constructing record batch: {e}")))
}

fn remove_plan(&self, handle: &str) -> Result<(), Status> {
Expand Down Expand Up @@ -340,7 +360,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
) -> Result<Response<FlightInfo>, Status> {
info!("get_flight_info_tables");
let ctx = self.get_ctx(&request)?;
let data = self.tables(ctx).await;
let data = self.tables(ctx).await?;
let schema = data.schema();

let uuid = Uuid::new_v4().hyphenated().to_string();
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/udf/table_list_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ impl TableFunctionImpl for TableListUdtf {
let Some(catalog) = catalog_list.catalog(&catalog_name) else {
continue;
};
for schema_name in catalog.schema_names() {
let Some(schema) = catalog.schema(&schema_name) else {
for schema_name in catalog.schema_names()? {
let Some(schema) = catalog.schema(&schema_name)? else {
continue;
};
for table_name in schema.table_names() {
for table_name in schema.table_names()? {
let Some(provider) = block_in_place(|| {
Handle::current().block_on(schema.table(&table_name))
})?
Expand Down
30 changes: 19 additions & 11 deletions datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ impl SchemaProvider for ResolvedSchemaProvider {
self.owner_name.as_deref()
}

fn table_names(&self) -> Vec<String> {
self.cached_tables.keys().cloned().collect()
fn table_names(&self) -> Result<Vec<String>> {
Ok(self.cached_tables.keys().cloned().collect())
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand All @@ -61,8 +61,8 @@ impl SchemaProvider for ResolvedSchemaProvider {
)
}

fn table_exist(&self, name: &str) -> bool {
self.cached_tables.contains_key(name)
fn table_exist(&self, name: &str) -> Result<bool> {
Ok(self.cached_tables.contains_key(name))
}
}

Expand Down Expand Up @@ -111,12 +111,12 @@ struct ResolvedCatalogProvider {
cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
}
impl CatalogProvider for ResolvedCatalogProvider {
fn schema_names(&self) -> Vec<String> {
self.cached_schemas.keys().cloned().collect()
fn schema_names(&self) -> Result<Vec<String>> {
Ok(self.cached_schemas.keys().cloned().collect())
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.cached_schemas.get(name).cloned()
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.cached_schemas.get(name).cloned())
}
}

Expand Down Expand Up @@ -592,12 +592,19 @@ mod tests {
);

for schema_ref in found_schemas {
let schema = cached_provider.schema(schema_ref);
let schema = cached_provider
.schema(schema_ref)
.expect("schema lookup should succeed");
assert!(schema.is_some());
}

for schema_ref in not_found_schemas {
assert!(cached_provider.schema(schema_ref).is_none());
assert!(
cached_provider
.schema(schema_ref)
.expect("schema lookup should succeed")
.is_none()
);
}
}

Expand Down Expand Up @@ -729,7 +736,8 @@ mod tests {
.unwrap();
let schema = catalog
.schema(table_ref.schema().unwrap_or(MOCK_SCHEMA))
.unwrap();
.expect("schema lookup should succeed")
.expect("default schema should exist");
assert!(schema.table(table_ref.table()).await.unwrap().is_some());
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ use datafusion_common::not_impl_err;
/// [`TableProvider`]: crate::TableProvider
pub trait CatalogProvider: Any + Debug + Sync + Send {
/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;
fn schema_names(&self) -> Result<Vec<String>>;

/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>>;

/// Adds a new schema to this catalog.
///
Expand Down
15 changes: 9 additions & 6 deletions datafusion/catalog/src/dynamic_file/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,20 @@ impl DynamicFileCatalogProvider {
}

impl CatalogProvider for DynamicFileCatalogProvider {
fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> datafusion_common::Result<Vec<String>> {
self.inner.schema_names()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name).map(|schema| {
fn schema(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.inner.schema(name)?.map(|schema| {
Arc::new(DynamicFileSchemaProvider::new(
schema,
Arc::clone(&self.factory),
)) as _
})
}))
}

fn register_schema(
Expand Down Expand Up @@ -128,7 +131,7 @@ impl DynamicFileSchemaProvider {

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> datafusion_common::Result<Vec<String>> {
self.inner.table_names()
}

Expand Down Expand Up @@ -158,7 +161,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> datafusion_common::Result<bool> {
self.inner.table_exist(name)
}
}
Expand Down
Loading
Loading