diff --git a/Cargo.lock b/Cargo.lock index c8f154346..3b07ec8a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3074,6 +3074,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-memory", + "log", "tempfile", "tokio", ] diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 653497980..1d4e8275d 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -38,6 +38,8 @@ async fn test_statistics() -> Result<()> { let stats = IcebergTableProvider::try_new_from_table(table) .await? + .with_computed_statistics() + .await .statistics(); assert_eq!( diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 81a94d839..496089f12 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } datafusion = { version = "44" } futures = { workspace = true } iceberg = { workspace = true } +log = { workspace = true } tokio = { workspace = true } [dev-dependencies] diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 6b9850e5f..7e801a48a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -50,13 +50,12 @@ pub struct IcebergTableProvider { } impl IcebergTableProvider { - pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self { - let statistics = compute_statistics(&table, None).await.ok(); + pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { IcebergTableProvider { table, snapshot_id: None, schema, - statistics, + statistics: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -72,12 +71,11 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - let statistics = compute_statistics(&table, None).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: None, schema, - statistics, + statistics: None, }) } @@ -85,12 +83,11 @@ impl IcebergTableProvider { /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table(table: Table) -> Result { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - let statistics = compute_statistics(&table, None).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: None, schema, - statistics, + statistics: None, }) } @@ -111,14 +108,23 @@ impl IcebergTableProvider { })?; let schema = snapshot.schema(table.metadata())?; let schema = Arc::new(schema_to_arrow_schema(&schema)?); - let statistics = compute_statistics(&table, Some(snapshot_id)).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), schema, - statistics, + statistics: None, }) } + + // Try to compute the underlying table statistics directly from the manifest/data files + pub async fn with_computed_statistics(mut self) -> Self { + let statistics = compute_statistics(&self.table, self.snapshot_id) + .await + .inspect_err(|err| log::warn!("Failed computing table statistics: {err}")) + .ok(); + self.statistics = statistics; + self + } } #[async_trait] diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index c2aea23f0..15a3fef68 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -129,9 +129,7 @@ impl TableProviderFactory for IcebergTableProviderFactory { let schema = schema_to_arrow_schema(table.metadata().current_schema()) .map_err(to_datafusion_error)?; - Ok(Arc::new( - IcebergTableProvider::new(table, Arc::new(schema)).await, - )) + Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema)))) } }