Skip to content

Commit

Permalink
Make statistics computation opt-in for IcebergTableProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jan 9, 2025
1 parent 98bdd8a commit 979d07a
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/integration_tests/tests/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ async fn test_statistics() -> Result<()> {

let stats = IcebergTableProvider::try_new_from_table(table)
.await?
.with_computed_statistics()
.await
.statistics();

assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async-trait = { workspace = true }
datafusion = { version = "44" }
futures = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
Expand Down
24 changes: 15 additions & 9 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ pub struct IcebergTableProvider {
}

impl IcebergTableProvider {
pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self {
let statistics = compute_statistics(&table, None).await.ok();
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
IcebergTableProvider {
table,
snapshot_id: None,
schema,
statistics,
statistics: None,
}
}
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
Expand All @@ -72,25 +71,23 @@ impl IcebergTableProvider {

let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

let statistics = compute_statistics(&table, None).await.ok();
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
statistics,
statistics: None,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table(table: Table) -> Result<Self> {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let statistics = compute_statistics(&table, None).await.ok();
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
statistics,
statistics: None,
})
}

Expand All @@ -111,14 +108,23 @@ impl IcebergTableProvider {
})?;
let schema = snapshot.schema(table.metadata())?;
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
let statistics = compute_statistics(&table, Some(snapshot_id)).await.ok();
Ok(IcebergTableProvider {
table,
snapshot_id: Some(snapshot_id),
schema,
statistics,
statistics: None,
})
}

// Try to compute the underlying table statistics directly from the manifest/data files
pub async fn with_computed_statistics(mut self) -> Self {
let statistics = compute_statistics(&self.table, self.snapshot_id)
.await
.inspect_err(|err| log::warn!("Failed computing table statistics: {err}"))
.ok();
self.statistics = statistics;
self
}
}

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ impl TableProviderFactory for IcebergTableProviderFactory {
let schema = schema_to_arrow_schema(table.metadata().current_schema())
.map_err(to_datafusion_error)?;

Ok(Arc::new(
IcebergTableProvider::new(table, Arc::new(schema)).await,
))
Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
}
}

Expand Down

0 comments on commit 979d07a

Please sign in to comment.