diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 206b7ba9c4be..38629328d71c 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -32,22 +32,19 @@ use arrow::array::record_batch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; use async_trait::async_trait; -use datafusion::catalog::{SchemaProvider, TableProvider}; -use datafusion::common::DataFusionError; +use datafusion::catalog::TableProvider; use datafusion::common::Result; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{DataFrame, SessionContext}; -use datafusion_catalog::Session; -use datafusion_common::{ - assert_batches_eq, internal_datafusion_err, plan_err, HashMap, TableReference, -}; +use datafusion_catalog::{AsyncSchemaProvider, Session}; +use datafusion_common::{assert_batches_eq, internal_datafusion_err, plan_err}; use datafusion_expr::{Expr, TableType}; use futures::TryStreamExt; use std::any::Any; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { @@ -55,23 +52,18 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); // Make a connection to the remote catalog, asynchronously, and configure it - let remote_catalog_interface = RemoteCatalogInterface::connect().await?; + let remote_catalog_interface = Arc::new(RemoteCatalogInterface::connect().await?); - // Register a SchemaProvider for tables in a schema named "remote_schema". - // - // This will let DataFusion query tables such as - // `datafusion.remote_schema.remote_table` - let remote_schema: Arc = - Arc::new(RemoteSchema::new(Arc::new(remote_catalog_interface))); - ctx.catalog("datafusion") - .ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))? - .register_schema("remote_schema", Arc::clone(&remote_schema))?; + // Create an adapter to provide the AsyncSchemaProvider interface to DataFusion + // based on our remote catalog interface + let remote_catalog_adapter = RemoteCatalogDatafusionAdapter(remote_catalog_interface); // Here is a query that selects data from a table in the remote catalog. let sql = "SELECT * from remote_schema.remote_table"; // The `SessionContext::sql` interface is async, but it does not - // support asynchronous access to catalogs, so the following query errors. + // support asynchronous access to catalogs, so we cannot register our schema provider + // directly and the following query fails to find our table. let results = ctx.sql(sql).await; assert_eq!( results.unwrap_err().to_string(), @@ -91,27 +83,26 @@ async fn main() -> Result<()> { // `remote_schema.remote_table`) let references = state.resolve_table_references(&statement)?; - // Call `load_tables` to load information from the remote catalog for each - // of the referenced tables. Best practice is to fetch the the information - // for all tables required by the query once (rather than one per table) to - // minimize network overhead - let table_names = references.iter().filter_map(|r| { - if refers_to_schema("datafusion", "remote_schema", r) { - Some(r.table()) - } else { - None - } - }); - remote_schema - .as_any() - .downcast_ref::() - .expect("correct types") - .load_tables(table_names) + // Now we can asynchronously resolve the table references to get a cached catalog + // that we can use for our query + let resolved_catalog = remote_catalog_adapter + .resolve(&references, state.config(), "datafusion", "remote_schema") .await?; - // Now continue planing the query after having fetched the remote table and - // it can run as normal - let plan = state.statement_to_plan(statement).await?; + // This resolved catalog only makes sense for this query and so we create a clone + // of the session context with the resolved catalog + let query_ctx = ctx.clone(); + + query_ctx + .catalog("datafusion") + .ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))? + .register_schema("remote_schema", resolved_catalog)?; + + // We can now continue planning the query with this new query-specific context that + // contains our cached catalog + let query_state = query_ctx.state(); + + let plan = query_state.statement_to_plan(statement).await?; let results = DataFrame::new(state, plan).collect().await?; assert_batches_eq!( [ @@ -145,9 +136,9 @@ impl RemoteCatalogInterface { } /// Fetches information for a specific table - pub async fn table_info(&self, name: &str) -> Result { + pub async fn table_info(&self, name: &str) -> Result> { if name != "remote_table" { - return plan_err!("Remote table not found: {}", name); + return Ok(None); } // In this example, we'll model a remote table with columns "id" and @@ -159,7 +150,7 @@ impl RemoteCatalogInterface { Field::new("id", arrow::datatypes::DataType::Int32, false), Field::new("name", arrow::datatypes::DataType::Utf8, false), ])); - Ok(Arc::new(schema)) + Ok(Some(Arc::new(schema))) } /// Fetches data for a table from a remote data source @@ -186,95 +177,22 @@ impl RemoteCatalogInterface { } } -/// Implements the DataFusion Catalog API interface for tables +/// Implements an async version of the DataFusion SchemaProvider API for tables /// stored in a remote catalog. -#[derive(Debug)] -struct RemoteSchema { - /// Connection with the remote catalog - remote_catalog_interface: Arc, - /// Local cache of tables that have been preloaded from the remote - /// catalog - tables: Mutex>>, -} - -impl RemoteSchema { - /// Create a new RemoteSchema - pub fn new(remote_catalog_interface: Arc) -> Self { - Self { - remote_catalog_interface, - tables: Mutex::new(HashMap::new()), - } - } - - /// Load information for the specified tables from the remote source into - /// the local cached copy. - pub async fn load_tables( - &self, - references: impl IntoIterator, - ) -> Result<()> { - for table_name in references { - if !self.table_exist(table_name) { - // Fetch information about the table from the remote catalog - // - // Note that a real remote catalog interface could return more - // information, but at the minimum, DataFusion requires the - // table's schema for planing. - let schema = self.remote_catalog_interface.table_info(table_name).await?; - let remote_table = RemoteTable::new( - Arc::clone(&self.remote_catalog_interface), - table_name, - schema, - ); - - // Add the table to our local cached list - self.tables - .lock() - .expect("mutex invalid") - .insert(table_name.to_string(), Arc::new(remote_table)); - }; - } - Ok(()) - } -} +struct RemoteCatalogDatafusionAdapter(Arc); -/// Implement the DataFusion Catalog API for [`RemoteSchema`] #[async_trait] -impl SchemaProvider for RemoteSchema { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - // Note this API is not async so we can't directly call the RemoteCatalogInterface - // instead we use the cached list of loaded tables - self.tables - .lock() - .expect("mutex valid") - .keys() - .cloned() - .collect() - } - - // While this API is actually `async` and thus could consult a remote - // catalog directly it is more efficient to use a local cached copy instead, - // which is what we model in this example - async fn table( - &self, - name: &str, - ) -> Result>, DataFusionError> { - // Look for any pre-loaded tables - let table = self - .tables - .lock() - .expect("mutex valid") - .get(name) - .map(Arc::clone); - Ok(table) - } - - fn table_exist(&self, name: &str) -> bool { - // Look for any pre-loaded tables, note this function is also `async` - self.tables.lock().expect("mutex valid").contains_key(name) +impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter { + async fn table(&self, name: &str) -> Result>> { + // Fetch information about the table from the remote catalog + // + // Note that a real remote catalog interface could return more + // information, but at the minimum, DataFusion requires the + // table's schema for planing. + Ok(self.0.table_info(name).await?.map(|schema| { + Arc::new(RemoteTable::new(Arc::clone(&self.0), name, schema)) + as Arc + })) } } @@ -343,27 +261,3 @@ impl TableProvider for RemoteTable { )?)) } } - -/// Return true if this `table_reference` might be for a table in the specified -/// catalog and schema. -fn refers_to_schema( - catalog_name: &str, - schema_name: &str, - table_reference: &TableReference, -) -> bool { - // Check the references are in the correct catalog and schema - // references like foo.bar.baz - if let Some(catalog) = table_reference.catalog() { - if catalog != catalog_name { - return false; - } - } - // references like bar.baz - if let Some(schema) = table_reference.schema() { - if schema != schema_name { - return false; - } - } - - true -} diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index f9801352087d..32a87cc7611c 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -36,5 +36,8 @@ datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } parking_lot = { workspace = true } +[dev-dependencies] +tokio = { workspace = true } + [lints] workspace = true diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs new file mode 100644 index 000000000000..504f20ff9543 --- /dev/null +++ b/datafusion/catalog/src/async.rs @@ -0,0 +1,747 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference}; +use datafusion_execution::config::SessionConfig; + +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; + +/// A schema provider that looks up tables in a cache +/// +/// Instances are created by the [`AsyncSchemaProvider::resolve`] method +#[derive(Debug)] +struct ResolvedSchemaProvider { + owner_name: Option, + cached_tables: HashMap>, +} +#[async_trait] +impl SchemaProvider for ResolvedSchemaProvider { + fn owner_name(&self) -> Option<&str> { + self.owner_name.as_deref() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn table_names(&self) -> Vec { + self.cached_tables.keys().cloned().collect() + } + + async fn table(&self, name: &str) -> Result>> { + Ok(self.cached_tables.get(name).cloned()) + } + + fn register_table( + &self, + name: String, + _table: Arc, + ) -> Result>> { + not_impl_err!( + "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported" + ) + } + + fn deregister_table(&self, name: &str) -> Result>> { + not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported") + } + + fn table_exist(&self, name: &str) -> bool { + self.cached_tables.contains_key(name) + } +} + +/// Helper class for building a [`ResolvedSchemaProvider`] +struct ResolvedSchemaProviderBuilder { + owner_name: String, + async_provider: Arc, + cached_tables: HashMap>>, +} +impl ResolvedSchemaProviderBuilder { + fn new(owner_name: String, async_provider: Arc) -> Self { + Self { + owner_name, + async_provider, + cached_tables: HashMap::new(), + } + } + + async fn resolve_table(&mut self, table_name: &str) -> Result<()> { + if !self.cached_tables.contains_key(table_name) { + let resolved_table = self.async_provider.table(table_name).await?; + self.cached_tables + .insert(table_name.to_string(), resolved_table); + } + Ok(()) + } + + fn finish(self) -> Arc { + let cached_tables = self + .cached_tables + .into_iter() + .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value))) + .collect(); + Arc::new(ResolvedSchemaProvider { + owner_name: Some(self.owner_name), + cached_tables, + }) + } +} + +/// A catalog provider that looks up schemas in a cache +/// +/// Instances are created by the [`AsyncCatalogProvider::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProvider { + cached_schemas: HashMap>, +} +impl CatalogProvider for ResolvedCatalogProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema_names(&self) -> Vec { + self.cached_schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option> { + self.cached_schemas.get(name).cloned() + } +} + +/// Helper class for building a [`ResolvedCatalogProvider`] +struct ResolvedCatalogProviderBuilder { + cached_schemas: HashMap>, + async_provider: Arc, +} +impl ResolvedCatalogProviderBuilder { + fn new(async_provider: Arc) -> Self { + Self { + cached_schemas: HashMap::new(), + async_provider, + } + } + fn finish(self) -> Arc { + let cached_schemas = self + .cached_schemas + .into_iter() + .filter_map(|(key, maybe_value)| { + maybe_value.map(|value| (key, value.finish())) + }) + .collect(); + Arc::new(ResolvedCatalogProvider { cached_schemas }) + } +} + +/// A catalog provider list that looks up catalogs in a cache +/// +/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProviderList { + cached_catalogs: HashMap>, +} +impl CatalogProviderList for ResolvedCatalogProviderList { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + unimplemented!("resolved providers cannot handle registration APIs") + } + + fn catalog_names(&self) -> Vec { + self.cached_catalogs.keys().cloned().collect() + } + + fn catalog(&self, name: &str) -> Option> { + self.cached_catalogs.get(name).cloned() + } +} + +/// A trait for schema providers that must resolve tables asynchronously +/// +/// The [`SchemaProvider::table`] method _is_ asynchronous. However, this is primarily for convenience and +/// it is not a good idea for this method to be slow as this will cause poor planning performance. +/// +/// It is a better idea to resolve the tables once and cache them in memory for the duration of +/// planning. This trait helps implement that pattern. +/// +/// After implementing this trait you can call the [`AsyncSchemaProvider::resolve`] method to get an +/// `Arc` that contains a cached copy of the referenced tables. The `resolve` +/// method can be slow and asynchronous as it is only called once, before planning. +#[async_trait] +pub trait AsyncSchemaProvider: Send + Sync { + /// Lookup a table in the schema provider + async fn table(&self, name: &str) -> Result>>; + /// Creates a cached provider that can be used to execute a query containing given references + /// + /// This method will walk through the references and look them up once, creating a cache of table + /// providers. This cache will be returned as a synchronous TableProvider that can be used to plan + /// and execute a query containing the given references. + /// + /// This cache is intended to be short-lived for the execution of a single query. There is no mechanism + /// for refresh or eviction of stale entries. + async fn resolve( + &self, + references: &[TableReference], + config: &SessionConfig, + catalog_name: &str, + schema_name: &str, + ) -> Result> { + let mut cached_tables = HashMap::>>::new(); + + for reference in references { + let ref_catalog_name = reference + .catalog() + .unwrap_or(&config.options().catalog.default_catalog); + + // Maybe this is a reference to some other catalog provided in another way + if ref_catalog_name != catalog_name { + continue; + } + + let ref_schema_name = reference + .schema() + .unwrap_or(&config.options().catalog.default_schema); + + if ref_schema_name != schema_name { + continue; + } + + if !cached_tables.contains_key(reference.table()) { + let resolved_table = self.table(reference.table()).await?; + cached_tables.insert(reference.table().to_string(), resolved_table); + } + } + + let cached_tables = cached_tables + .into_iter() + .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value))) + .collect(); + + Ok(Arc::new(ResolvedSchemaProvider { + cached_tables, + owner_name: Some(catalog_name.to_string()), + })) + } +} + +/// A trait for catalog providers that must resolve schemas asynchronously +/// +/// The [`CatalogProvider::schema`] method is synchronous because asynchronous operations should +/// not be used during planning. This trait makes it easy to lookup schema references once and cache +/// them for future planning use. See [`AsyncSchemaProvider`] for more details on motivation. + +#[async_trait] +pub trait AsyncCatalogProvider: Send + Sync { + /// Lookup a schema in the provider + async fn schema(&self, name: &str) -> Result>>; + + /// Creates a cached provider that can be used to execute a query containing given references + /// + /// This method will walk through the references and look them up once, creating a cache of schema + /// providers (each with their own cache of table providers). This cache will be returned as a + /// synchronous CatalogProvider that can be used to plan and execute a query containing the given + /// references. + /// + /// This cache is intended to be short-lived for the execution of a single query. There is no mechanism + /// for refresh or eviction of stale entries. + async fn resolve( + &self, + references: &[TableReference], + config: &SessionConfig, + catalog_name: &str, + ) -> Result> { + let mut cached_schemas = + HashMap::>::new(); + + for reference in references { + let ref_catalog_name = reference + .catalog() + .unwrap_or(&config.options().catalog.default_catalog); + + // Maybe this is a reference to some other catalog provided in another way + if ref_catalog_name != catalog_name { + continue; + } + + let schema_name = reference + .schema() + .unwrap_or(&config.options().catalog.default_schema); + + let schema = if let Some(schema) = cached_schemas.get_mut(schema_name) { + schema + } else { + let resolved_schema = self.schema(schema_name).await?; + let resolved_schema = resolved_schema.map(|resolved_schema| { + ResolvedSchemaProviderBuilder::new( + catalog_name.to_string(), + resolved_schema, + ) + }); + cached_schemas.insert(schema_name.to_string(), resolved_schema); + cached_schemas.get_mut(schema_name).unwrap() + }; + + // If we can't find the catalog don't bother checking the table + let Some(schema) = schema else { continue }; + + schema.resolve_table(reference.table()).await?; + } + + let cached_schemas = cached_schemas + .into_iter() + .filter_map(|(key, maybe_builder)| { + maybe_builder.map(|schema_builder| (key, schema_builder.finish())) + }) + .collect::>(); + + Ok(Arc::new(ResolvedCatalogProvider { cached_schemas })) + } +} + +/// A trait for catalog provider lists that must resolve catalogs asynchronously +/// +/// The [`CatalogProviderList::catalog`] method is synchronous because asynchronous operations should +/// not be used during planning. This trait makes it easy to lookup catalog references once and cache +/// them for future planning use. See [`AsyncSchemaProvider`] for more details on motivation. +#[async_trait] +pub trait AsyncCatalogProviderList: Send + Sync { + /// Lookup a catalog in the provider + async fn catalog(&self, name: &str) -> Result>>; + + /// Creates a cached provider that can be used to execute a query containing given references + /// + /// This method will walk through the references and look them up once, creating a cache of catalog + /// providers, schema providers, and table providers. This cache will be returned as a + /// synchronous CatalogProvider that can be used to plan and execute a query containing the given + /// references. + /// + /// This cache is intended to be short-lived for the execution of a single query. There is no mechanism + /// for refresh or eviction of stale entries. + async fn resolve( + &self, + references: &[TableReference], + config: &SessionConfig, + ) -> Result> { + let mut cached_catalogs = + HashMap::>::new(); + + for reference in references { + let catalog_name = reference + .catalog() + .unwrap_or(&config.options().catalog.default_catalog); + + // We will do three lookups here, one for the catalog, one for the schema, and one for the table + // We cache the result (both found results and not-found results) to speed up future lookups + // + // Note that a cache-miss is not an error at this point. We allow for the possibility that + // other providers may supply the reference. + // + // If this is the only provider then a not-found error will be raised during planning when it can't + // find the reference in the cache. + + let catalog = if let Some(catalog) = cached_catalogs.get_mut(catalog_name) { + catalog + } else { + let resolved_catalog = self.catalog(catalog_name).await?; + let resolved_catalog = + resolved_catalog.map(ResolvedCatalogProviderBuilder::new); + cached_catalogs.insert(catalog_name.to_string(), resolved_catalog); + cached_catalogs.get_mut(catalog_name).unwrap() + }; + + // If we can't find the catalog don't bother checking the schema / table + let Some(catalog) = catalog else { continue }; + + let schema_name = reference + .schema() + .unwrap_or(&config.options().catalog.default_schema); + + let schema = if let Some(schema) = catalog.cached_schemas.get_mut(schema_name) + { + schema + } else { + let resolved_schema = catalog.async_provider.schema(schema_name).await?; + let resolved_schema = resolved_schema.map(|async_schema| { + ResolvedSchemaProviderBuilder::new( + catalog_name.to_string(), + async_schema, + ) + }); + catalog + .cached_schemas + .insert(schema_name.to_string(), resolved_schema); + catalog.cached_schemas.get_mut(schema_name).unwrap() + }; + + // If we can't find the catalog don't bother checking the table + let Some(schema) = schema else { continue }; + + schema.resolve_table(reference.table()).await?; + } + + // Build the cached catalog provider list + let cached_catalogs = cached_catalogs + .into_iter() + .filter_map(|(key, maybe_builder)| { + maybe_builder.map(|catalog_builder| (key, catalog_builder.finish())) + }) + .collect::>(); + + Ok(Arc::new(ResolvedCatalogProviderList { cached_catalogs })) + } +} + +#[cfg(test)] +mod tests { + use std::{ + any::Any, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + }; + + use arrow_schema::SchemaRef; + use async_trait::async_trait; + use datafusion_common::{error::Result, Statistics, TableReference}; + use datafusion_execution::config::SessionConfig; + use datafusion_expr::{Expr, TableType}; + use datafusion_physical_plan::ExecutionPlan; + + use crate::{Session, TableProvider}; + + use super::{AsyncCatalogProvider, AsyncCatalogProviderList, AsyncSchemaProvider}; + + #[derive(Debug)] + struct MockTableProvider {} + #[async_trait] + impl TableProvider for MockTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef { + unimplemented!() + } + + fn table_type(&self) -> TableType { + unimplemented!() + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!() + } + + fn statistics(&self) -> Option { + unimplemented!() + } + } + + #[derive(Default)] + struct MockAsyncSchemaProvider { + lookup_count: AtomicU32, + } + + const MOCK_CATALOG: &str = "mock_catalog"; + const MOCK_SCHEMA: &str = "mock_schema"; + const MOCK_TABLE: &str = "mock_table"; + + #[async_trait] + impl AsyncSchemaProvider for MockAsyncSchemaProvider { + async fn table(&self, name: &str) -> Result>> { + self.lookup_count.fetch_add(1, Ordering::Release); + if name == MOCK_TABLE { + Ok(Some(Arc::new(MockTableProvider {}))) + } else { + Ok(None) + } + } + } + + fn test_config() -> SessionConfig { + let mut config = SessionConfig::default(); + config.options_mut().catalog.default_catalog = MOCK_CATALOG.to_string(); + config.options_mut().catalog.default_schema = MOCK_SCHEMA.to_string(); + config + } + + #[tokio::test] + async fn test_async_schema_provider_resolve() { + async fn check( + refs: Vec, + expected_lookup_count: u32, + found_tables: &[&str], + not_found_tables: &[&str], + ) { + let async_provider = MockAsyncSchemaProvider::default(); + let cached_provider = async_provider + .resolve(&refs, &test_config(), MOCK_CATALOG, MOCK_SCHEMA) + .await + .unwrap(); + + assert_eq!( + async_provider.lookup_count.load(Ordering::Acquire), + expected_lookup_count + ); + + for table_ref in found_tables { + let table = cached_provider.table(table_ref).await.unwrap(); + assert!(table.is_some()); + } + + for table_ref in not_found_tables { + assert!(cached_provider.table(table_ref).await.unwrap().is_none()); + } + } + + // Basic full lookups + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"), + ], + 2, + &[MOCK_TABLE], + &["not_exists"], + ) + .await; + + // Catalog / schema mismatch doesn't even search + check( + vec![ + TableReference::full(MOCK_CATALOG, "foo", MOCK_TABLE), + TableReference::full("foo", MOCK_SCHEMA, MOCK_TABLE), + ], + 0, + &[], + &[MOCK_TABLE], + ) + .await; + + // Both hits and misses cached + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"), + ], + 2, + &[MOCK_TABLE], + &["not_exists"], + ) + .await; + } + + #[derive(Default)] + struct MockAsyncCatalogProvider { + lookup_count: AtomicU32, + } + + #[async_trait] + impl AsyncCatalogProvider for MockAsyncCatalogProvider { + async fn schema( + &self, + name: &str, + ) -> Result>> { + self.lookup_count.fetch_add(1, Ordering::Release); + if name == MOCK_SCHEMA { + Ok(Some(Arc::new(MockAsyncSchemaProvider::default()))) + } else { + Ok(None) + } + } + } + + #[tokio::test] + async fn test_async_catalog_provider_resolve() { + async fn check( + refs: Vec, + expected_lookup_count: u32, + found_schemas: &[&str], + not_found_schemas: &[&str], + ) { + let async_provider = MockAsyncCatalogProvider::default(); + let cached_provider = async_provider + .resolve(&refs, &test_config(), MOCK_CATALOG) + .await + .unwrap(); + + assert_eq!( + async_provider.lookup_count.load(Ordering::Acquire), + expected_lookup_count + ); + + for schema_ref in found_schemas { + let schema = cached_provider.schema(schema_ref); + assert!(schema.is_some()); + } + + for schema_ref in not_found_schemas { + assert!(cached_provider.schema(schema_ref).is_none()); + } + } + + // Basic full lookups + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"), + TableReference::full(MOCK_CATALOG, "not_exists", "x"), + ], + 2, + &[MOCK_SCHEMA], + &["not_exists"], + ) + .await; + + // Catalog mismatch doesn't even search + check( + vec![TableReference::full("foo", MOCK_SCHEMA, "x")], + 0, + &[], + &[MOCK_SCHEMA], + ) + .await; + + // Both hits and misses cached + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"), + TableReference::full(MOCK_CATALOG, "not_exists", "x"), + TableReference::full(MOCK_CATALOG, "not_exists", "x"), + ], + 2, + &[MOCK_SCHEMA], + &["not_exists"], + ) + .await; + } + + #[derive(Default)] + struct MockAsyncCatalogProviderList { + lookup_count: AtomicU32, + } + + #[async_trait] + impl AsyncCatalogProviderList for MockAsyncCatalogProviderList { + async fn catalog( + &self, + name: &str, + ) -> Result>> { + self.lookup_count.fetch_add(1, Ordering::Release); + if name == MOCK_CATALOG { + Ok(Some(Arc::new(MockAsyncCatalogProvider::default()))) + } else { + Ok(None) + } + } + } + + #[tokio::test] + async fn test_async_catalog_provider_list_resolve() { + async fn check( + refs: Vec, + expected_lookup_count: u32, + found_catalogs: &[&str], + not_found_catalogs: &[&str], + ) { + let async_provider = MockAsyncCatalogProviderList::default(); + let cached_provider = + async_provider.resolve(&refs, &test_config()).await.unwrap(); + + assert_eq!( + async_provider.lookup_count.load(Ordering::Acquire), + expected_lookup_count + ); + + for catalog_ref in found_catalogs { + let catalog = cached_provider.catalog(catalog_ref); + assert!(catalog.is_some()); + } + + for catalog_ref in not_found_catalogs { + assert!(cached_provider.catalog(catalog_ref).is_none()); + } + } + + // Basic full lookups + check( + vec![ + TableReference::full(MOCK_CATALOG, "x", "x"), + TableReference::full("not_exists", "x", "x"), + ], + 2, + &[MOCK_CATALOG], + &["not_exists"], + ) + .await; + + // Both hits and misses cached + check( + vec![ + TableReference::full(MOCK_CATALOG, "x", "x"), + TableReference::full(MOCK_CATALOG, "x", "x"), + TableReference::full("not_exists", "x", "x"), + TableReference::full("not_exists", "x", "x"), + ], + 2, + &[MOCK_CATALOG], + &["not_exists"], + ) + .await; + } + + #[tokio::test] + async fn test_defaults() { + for table_ref in &[ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::partial(MOCK_SCHEMA, MOCK_TABLE), + TableReference::bare(MOCK_TABLE), + ] { + let async_provider = MockAsyncCatalogProviderList::default(); + let cached_provider = async_provider + .resolve(&[table_ref.clone()], &test_config()) + .await + .unwrap(); + + let catalog = cached_provider + .catalog(table_ref.catalog().unwrap_or(MOCK_CATALOG)) + .unwrap(); + let schema = catalog + .schema(table_ref.schema().unwrap_or(MOCK_SCHEMA)) + .unwrap(); + assert!(schema.table(table_ref.table()).await.unwrap().is_some()); + } + } +} diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index 21630f267d2c..3cf2a3b3cd33 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod r#async; mod catalog; mod dynamic_file; mod schema; @@ -23,6 +24,7 @@ mod table; pub use catalog::*; pub use dynamic_file::catalog::*; +pub use r#async::*; pub use schema::*; pub use session::*; pub use table::*;