Skip to content

Commit 6cf74d6

Browse files
adriangbalamb
andauthored
Make SessionContext::register_parquet obey collect_statistics config (#16080)
* fix * add a test * fmt * add to upgrade guide * fix tests * fix test * fix test * fix ci * Fix example in upgrade guide (#29) --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent ca55f1c commit 6cf74d6

File tree

8 files changed

+114
-17
lines changed

8 files changed

+114
-17
lines changed

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
550550

551551
ListingOptions::new(Arc::new(file_format))
552552
.with_file_extension(self.file_extension)
553-
.with_target_partitions(config.target_partitions())
553+
.with_session_config_options(config)
554554
.with_table_partition_cols(self.table_partition_cols.clone())
555555
.with_file_sort_order(self.file_sort_order.clone())
556556
}
@@ -585,9 +585,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
585585

586586
ListingOptions::new(Arc::new(file_format))
587587
.with_file_extension(self.file_extension)
588-
.with_target_partitions(config.target_partitions())
589588
.with_table_partition_cols(self.table_partition_cols.clone())
590589
.with_file_sort_order(self.file_sort_order.clone())
590+
.with_session_config_options(config)
591591
}
592592

593593
async fn get_resolved_schema(
@@ -615,7 +615,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
615615

616616
ListingOptions::new(Arc::new(file_format))
617617
.with_file_extension(self.file_extension)
618-
.with_target_partitions(config.target_partitions())
618+
.with_session_config_options(config)
619619
.with_table_partition_cols(self.table_partition_cols.clone())
620620
.with_file_sort_order(self.file_sort_order.clone())
621621
}
@@ -643,7 +643,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
643643

644644
ListingOptions::new(Arc::new(file_format))
645645
.with_file_extension(self.file_extension)
646-
.with_target_partitions(config.target_partitions())
646+
.with_session_config_options(config)
647647
.with_table_partition_cols(self.table_partition_cols.clone())
648648
}
649649

@@ -669,7 +669,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
669669

670670
ListingOptions::new(Arc::new(file_format))
671671
.with_file_extension(self.file_extension)
672-
.with_target_partitions(config.target_partitions())
672+
.with_session_config_options(config)
673673
.with_table_partition_cols(self.table_partition_cols.clone())
674674
}
675675

datafusion/core/src/datasource/listing/table.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider;
3232
use datafusion_common::{config_err, DataFusionError, Result};
3333
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3434
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
35+
use datafusion_execution::config::SessionConfig;
3536
use datafusion_expr::dml::InsertOp;
3637
use datafusion_expr::{Expr, TableProviderFilterPushDown};
3738
use datafusion_expr::{SortExpr, TableType};
@@ -195,7 +196,8 @@ impl ListingTableConfig {
195196

196197
let listing_options = ListingOptions::new(file_format)
197198
.with_file_extension(listing_file_extension)
198-
.with_target_partitions(state.config().target_partitions());
199+
.with_target_partitions(state.config().target_partitions())
200+
.with_collect_stat(state.config().collect_statistics());
199201

200202
Ok(Self {
201203
table_paths: self.table_paths,
@@ -313,18 +315,29 @@ impl ListingOptions {
313315
/// - use default file extension filter
314316
/// - no input partition to discover
315317
/// - one target partition
316-
/// - stat collection
318+
/// - do not collect statistics
317319
pub fn new(format: Arc<dyn FileFormat>) -> Self {
318320
Self {
319321
file_extension: format.get_ext(),
320322
format,
321323
table_partition_cols: vec![],
322-
collect_stat: true,
324+
collect_stat: false,
323325
target_partitions: 1,
324326
file_sort_order: vec![],
325327
}
326328
}
327329

330+
/// Set options from [`SessionConfig`] and returns self.
331+
///
332+
/// Currently this sets `target_partitions` and `collect_stat`
333+
/// but if more options are added in the future that need to be coordinated
334+
/// they will be synchronized thorugh this method.
335+
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
336+
self = self.with_target_partitions(config.target_partitions());
337+
self = self.with_collect_stat(config.collect_statistics());
338+
self
339+
}
340+
328341
/// Set file extension on [`ListingOptions`] and returns self.
329342
///
330343
/// # Example
@@ -1282,7 +1295,9 @@ mod tests {
12821295

12831296
#[tokio::test]
12841297
async fn read_single_file() -> Result<()> {
1285-
let ctx = SessionContext::new();
1298+
let ctx = SessionContext::new_with_config(
1299+
SessionConfig::new().with_collect_statistics(true),
1300+
);
12861301

12871302
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
12881303
let projection = None;
@@ -1309,7 +1324,7 @@ mod tests {
13091324

13101325
#[cfg(feature = "parquet")]
13111326
#[tokio::test]
1312-
async fn load_table_stats_by_default() -> Result<()> {
1327+
async fn do_not_load_table_stats_by_default() -> Result<()> {
13131328
use crate::datasource::file_format::parquet::ParquetFormat;
13141329

13151330
let testdata = crate::test_util::parquet_test_data();
@@ -1321,6 +1336,22 @@ mod tests {
13211336

13221337
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
13231338
let schema = opt.infer_schema(&state, &table_path).await?;
1339+
let config = ListingTableConfig::new(table_path.clone())
1340+
.with_listing_options(opt)
1341+
.with_schema(schema);
1342+
let table = ListingTable::try_new(config)?;
1343+
1344+
let exec = table.scan(&state, None, &[], None).await?;
1345+
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
1346+
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1347+
assert_eq!(
1348+
exec.partition_statistics(None)?.total_byte_size,
1349+
Precision::Absent
1350+
);
1351+
1352+
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
1353+
.with_collect_stat(true);
1354+
let schema = opt.infer_schema(&state, &table_path).await?;
13241355
let config = ListingTableConfig::new(table_path)
13251356
.with_listing_options(opt)
13261357
.with_schema(schema);

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,8 @@ impl TableProviderFactory for ListingTableFactory {
111111
let table_path = ListingTableUrl::parse(&cmd.location)?;
112112

113113
let options = ListingOptions::new(file_format)
114-
.with_collect_stat(state.config().collect_statistics())
115114
.with_file_extension(file_extension)
116-
.with_target_partitions(state.config().target_partitions())
115+
.with_session_config_options(session_state.config())
117116
.with_table_partition_cols(table_partition_cols);
118117

119118
options

datafusion/core/src/execution/context/parquet.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ mod tests {
8484
use crate::parquet::basic::Compression;
8585
use crate::test_util::parquet_test_data;
8686

87+
use arrow::util::pretty::pretty_format_batches;
88+
use datafusion_common::assert_contains;
8789
use datafusion_common::config::TableParquetOptions;
8890
use datafusion_execution::config::SessionConfig;
8991

@@ -129,6 +131,49 @@ mod tests {
129131
Ok(())
130132
}
131133

134+
async fn explain_query_all_with_config(config: SessionConfig) -> Result<String> {
135+
let ctx = SessionContext::new_with_config(config);
136+
137+
ctx.register_parquet(
138+
"test",
139+
&format!("{}/alltypes_plain*.parquet", parquet_test_data()),
140+
ParquetReadOptions::default(),
141+
)
142+
.await?;
143+
let df = ctx.sql("EXPLAIN SELECT * FROM test").await?;
144+
let results = df.collect().await?;
145+
let content = pretty_format_batches(&results).unwrap().to_string();
146+
Ok(content)
147+
}
148+
149+
#[tokio::test]
150+
async fn register_parquet_respects_collect_statistics_config() -> Result<()> {
151+
// The default is false
152+
let mut config = SessionConfig::new();
153+
config.options_mut().explain.physical_plan_only = true;
154+
config.options_mut().explain.show_statistics = true;
155+
let content = explain_query_all_with_config(config).await?;
156+
assert_contains!(content, "statistics=[Rows=Absent,");
157+
158+
// Explicitly set to false
159+
let mut config = SessionConfig::new();
160+
config.options_mut().explain.physical_plan_only = true;
161+
config.options_mut().explain.show_statistics = true;
162+
config.options_mut().execution.collect_statistics = false;
163+
let content = explain_query_all_with_config(config).await?;
164+
assert_contains!(content, "statistics=[Rows=Absent,");
165+
166+
// Explicitly set to true
167+
let mut config = SessionConfig::new();
168+
config.options_mut().explain.physical_plan_only = true;
169+
config.options_mut().explain.show_statistics = true;
170+
config.options_mut().execution.collect_statistics = true;
171+
let content = explain_query_all_with_config(config).await?;
172+
assert_contains!(content, "statistics=[Rows=Exact(10),");
173+
174+
Ok(())
175+
}
176+
132177
#[tokio::test]
133178
async fn read_from_registered_table_with_glob_path() -> Result<()> {
134179
let ctx = SessionContext::new();

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ async fn check_stats_precision_with_filter_pushdown() {
5050
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
5151
let table_path = ListingTableUrl::parse(filename).unwrap();
5252

53-
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
53+
let opt =
54+
ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
5455
let table = get_listing_table(&table_path, None, &opt).await;
5556

5657
let (_, _, state) = get_cache_runtime_state();
@@ -109,7 +110,8 @@ async fn load_table_stats_with_session_level_cache() {
109110
// Create a separate DefaultFileStatisticsCache
110111
let (cache2, _, state2) = get_cache_runtime_state();
111112

112-
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
113+
let opt =
114+
ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
113115

114116
let table1 = get_listing_table(&table_path, Some(cache1), &opt).await;
115117
let table2 = get_listing_table(&table_path, Some(cache2), &opt).await;

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,9 @@ async fn csv_explain_verbose_plans() {
561561
async fn explain_analyze_runs_optimizers(#[values("*", "1")] count_expr: &str) {
562562
// repro for https://github.com/apache/datafusion/issues/917
563563
// where EXPLAIN ANALYZE was not correctly running optimizer
564-
let ctx = SessionContext::new();
564+
let ctx = SessionContext::new_with_config(
565+
SessionConfig::new().with_collect_statistics(true),
566+
);
565567
register_alltypes_parquet(&ctx).await;
566568

567569
// This happens as an optimization pass where count(*)/count(1) can be

datafusion/core/tests/sql/path_partition.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,9 @@ async fn parquet_multiple_nonstring_partitions() -> Result<()> {
431431

432432
#[tokio::test]
433433
async fn parquet_statistics() -> Result<()> {
434-
let ctx = SessionContext::new();
434+
let mut config = SessionConfig::new();
435+
config.options_mut().execution.collect_statistics = true;
436+
let ctx = SessionContext::new_with_config(config);
435437

436438
register_partitioned_alltypes_parquet(
437439
&ctx,
@@ -583,7 +585,8 @@ async fn create_partitioned_alltypes_parquet_table(
583585
.iter()
584586
.map(|x| (x.0.to_owned(), x.1.clone()))
585587
.collect::<Vec<_>>(),
586-
);
588+
)
589+
.with_session_config_options(&ctx.copied_config());
587590

588591
let table_path = ListingTableUrl::parse(table_path).unwrap();
589592
let store_path =

docs/source/library-user-guide/upgrading.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@
2121

2222
## DataFusion `48.0.0`
2323

24+
### `ListingOptions` default for `collect_stat` changed from `true` to `false`
25+
26+
This makes it agree with the default for `SessionConfig`.
27+
Most users won't be impacted by this change but if you were using `ListingOptions` directly
28+
and relied on the default value of `collect_stat` being `true`, you will need to
29+
explicitly set it to `true` in your code.
30+
31+
```rust
32+
# /* comment to avoid running
33+
ListingOptions::new(Arc::new(ParquetFormat::default()))
34+
.with_collect_stat(true)
35+
// other options
36+
# */
37+
```
38+
2439
### Processing `Field` instead of `DataType` for user defined functions
2540

2641
In order to support metadata handling and extension types, user defined functions are

0 commit comments

Comments
 (0)