diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 57c9c578096f..4bc37f0ae984 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -396,7 +396,8 @@ async fn get_table( } "parquet" => { let path = format!("{}/{}", path, table); - let format = ParquetFormat::default().with_enable_pruning(true); + let format = ParquetFormat::new(ctx.config_options()) + .with_enable_pruning(Some(true)); (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 88cc6f1c23be..d10f11a43d22 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -67,7 +67,9 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let config = SessionConfig::new(); + let listing_options = + ListingOptions::new(Arc::new(ParquetFormat::new(config.config_options()))); let table_path = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 95bcd48533b9..7e2038c8a3e6 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -32,7 +32,8 @@ async fn main() -> Result<()> { let testdata = datafusion::test_util::parquet_test_data(); // Configure listing options - let file_format = ParquetFormat::default().with_enable_pruning(true); + let file_format = + ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true)); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(FileType::PARQUET.get_ext()); diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 620634f259ba..c738bc0cf27c 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -61,6 +61,16 @@ pub const OPT_PARQUET_REORDER_FILTERS: &str = pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str = "datafusion.execution.parquet.enable_page_index"; +/// Configuration option "datafusion.execution.parquet.pruning" +pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning"; + +/// Configuration option "datafusion.execution.parquet.skip_metadata" +pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata"; + +/// Configuration option "datafusion.execution.parquet.metadata_size_hint" +pub const OPT_PARQUET_METADATA_SIZE_HINT: &str = + "datafusion.execution.parquet.metadata_size_hint"; + /// Configuration option "datafusion.optimizer.skip_failed_rules" pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = "datafusion.optimizer.skip_failed_rules"; @@ -255,6 +265,29 @@ impl BuiltInConfigs { to reduce the number of rows decoded.", false, ), + ConfigDefinition::new_bool( + OPT_PARQUET_ENABLE_PRUNING, + "If true, the parquet reader attempts to skip entire row groups based \ + on the predicate in the query and the metadata (min/max values) stored in \ + the parquet file.", + true, + ), + ConfigDefinition::new_bool( + OPT_PARQUET_SKIP_METADATA, + "If true, the parquet reader skip the optional embedded metadata that may be in \ + the file Schema. This setting can help avoid schema conflicts when querying \ + multiple parquet files with schemas containing compatible types but different metadata.", + true, + ), + ConfigDefinition::new( + OPT_PARQUET_METADATA_SIZE_HINT, + "If specified, the parquet reader will try and fetch the last `size_hint` \ + bytes of the parquet file optimistically. If not specified, two read are required: \ + One read to fetch the 8-byte parquet footer and \ + another to fetch the metadata length encoded in the footer.", + DataType::UInt64, + ScalarValue::UInt64(None), + ), ConfigDefinition::new_bool( OPT_OPTIMIZER_SKIP_FAILED_RULES, "When set to true, the logical plan optimizer will produce warning \ @@ -424,6 +457,12 @@ impl ConfigOptions { get_conf_value!(self, UInt64, key, "u64") } + /// get a u64 configuration option as a usize + pub fn get_usize(&self, key: &str) -> Option { + let v = get_conf_value!(self, UInt64, key, "usize"); + v.and_then(|v| v.try_into().ok()) + } + /// get a string configuration option pub fn get_string(&self, key: &str) -> Option { get_conf_value!(self, Utf8, key, "string") diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index fa9ab13cd115..6e1fa17824fc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -28,6 +28,7 @@ use datafusion_common::DataFusionError; use datafusion_optimizer::utils::conjunction; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; +use parking_lot::RwLock; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; @@ -39,6 +40,10 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; +use crate::config::ConfigOptions; +use crate::config::OPT_PARQUET_ENABLE_PRUNING; +use crate::config::OPT_PARQUET_METADATA_SIZE_HINT; +use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_expr::Expr; @@ -52,51 +57,69 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; /// The Apache Parquet `FileFormat` implementation #[derive(Debug)] pub struct ParquetFormat { - enable_pruning: bool, + // Session level configuration + config_options: Arc>, + // local overides + enable_pruning: Option, metadata_size_hint: Option, - skip_metadata: bool, + skip_metadata: Option, } -impl Default for ParquetFormat { - fn default() -> Self { +impl ParquetFormat { + /// construct a new Format with the specified `ConfigOptions` + pub fn new(config_options: Arc>) -> Self { Self { - enable_pruning: true, + config_options, + enable_pruning: None, metadata_size_hint: None, - skip_metadata: true, + skip_metadata: None, } } -} -impl ParquetFormat { /// Activate statistics based row group level pruning - /// - defaults to true - pub fn with_enable_pruning(mut self, enable: bool) -> Self { + /// - If None, defaults to value on `config_options` + pub fn with_enable_pruning(mut self, enable: Option) -> Self { self.enable_pruning = enable; self } + /// Return true if pruning is enabled + pub fn enable_pruning(&self) -> bool { + self.enable_pruning + .or_else(|| { + self.config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + }) + .unwrap_or(true) + } + /// Provide a hint to the size of the file metadata. If a hint is provided /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then /// another read to fetch the metadata length encoded in the footer. - pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self { - self.metadata_size_hint = Some(size_hint); + /// + /// - If None, defaults to value on `config_options` + pub fn with_metadata_size_hint(mut self, size_hint: Option) -> Self { + self.metadata_size_hint = size_hint; self } - /// Return true if pruning is enabled - pub fn enable_pruning(&self) -> bool { - self.enable_pruning - } /// Return the metadata size hint if set pub fn metadata_size_hint(&self) -> Option { - self.metadata_size_hint + self.metadata_size_hint.or_else(|| { + self.config_options + .read() + .get_usize(OPT_PARQUET_METADATA_SIZE_HINT) + }) } /// Tell the parquet reader to skip any metadata that may be in /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self { + /// metadata. + /// + /// - If None, defaults to value on `config_options` + pub fn with_skip_metadata(mut self, skip_metadata: Option) -> Self { self.skip_metadata = skip_metadata; self } @@ -105,6 +128,12 @@ impl ParquetFormat { /// schema merging. pub fn skip_metadata(&self) -> bool { self.skip_metadata + .or_else(|| { + self.config_options + .read() + .get_bool(OPT_PARQUET_SKIP_METADATA) + }) + .unwrap_or(true) } } @@ -143,7 +172,7 @@ impl FileFormat for ParquetFormat { schemas.push(schema) } - let schema = if self.skip_metadata { + let schema = if self.skip_metadata() { Schema::try_merge(clear_metadata(schemas)) } else { Schema::try_merge(schemas) @@ -176,7 +205,7 @@ impl FileFormat for ParquetFormat { // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = if self.enable_pruning { + let predicate = if self.enable_pruning() { conjunction(filters.to_vec()) } else { None @@ -620,7 +649,8 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; - let format = ParquetFormat::default(); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()); let schema = format.infer_schema(&store, &meta).await.unwrap(); let stats = @@ -767,7 +797,9 @@ mod tests { assert_eq!(store.request_count(), 2); - let format = ParquetFormat::default().with_metadata_size_hint(9); + let ctx = SessionContext::new(); + let format = + ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(Some(9)); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = @@ -794,7 +826,9 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let format = ParquetFormat::default().with_metadata_size_hint(size_hint); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()) + .with_metadata_size_hint(Some(size_hint)); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = fetch_statistics( store.upcast().as_ref(), @@ -831,7 +865,7 @@ mod tests { let config = SessionConfig::new().with_batch_size(2); let ctx = SessionContext::with_config(config); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx)?; @@ -860,11 +894,12 @@ mod tests { // Read the full file let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; // Read only one column. This should scan less data. let projection = Some(vec![0]); - let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec_projected = + get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); @@ -882,7 +917,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics().num_rows, Some(8)); @@ -901,7 +937,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let x: Vec = exec .schema() @@ -939,7 +976,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -965,7 +1003,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -988,7 +1027,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1011,7 +1051,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1037,7 +1078,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1063,7 +1105,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1090,7 +1133,7 @@ mod tests { let task_ctx = session_ctx.task_ctx(); // parquet use the int32 as the physical type to store decimal - let exec = get_exec("int32_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int32_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1098,7 +1141,7 @@ mod tests { assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal - let exec = get_exec("int64_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int64_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1106,14 +1149,21 @@ mod tests { assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal - let exec = get_exec("fixed_length_decimal.parquet", None, None).await?; + let exec = + get_exec(&session_ctx, "fixed_length_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); - let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?; + let exec = get_exec( + &session_ctx, + "fixed_length_decimal_legacy.parquet", + None, + None, + ) + .await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1123,7 +1173,7 @@ mod tests { // parquet use the fixed length binary as the physical type to store decimal // TODO: arrow-rs don't support convert the physical type of binary to decimal // https://github.com/apache/arrow-rs/pull/2160 - // let exec = get_exec("byte_array_decimal.parquet", None, None).await?; + // let exec = get_exec(&session_ctx, "byte_array_decimal.parquet", None, None).await?; Ok(()) } @@ -1207,12 +1257,13 @@ mod tests { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); scan_format(&format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 374699065630..4ca044f0bf5b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -28,7 +28,9 @@ use datafusion_physical_expr::PhysicalSortExpr; use futures::{future, stream, StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::ObjectMeta; +use parking_lot::RwLock; +use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ @@ -107,7 +109,10 @@ impl ListingTableConfig { } } - fn infer_format(path: &str) -> Result<(Arc, String)> { + fn infer_format( + config_options: Arc>, + path: &str, + ) -> Result<(Arc, String)> { let err_msg = format!("Unable to infer file type from path: {}", path); let mut exts = path.rsplit('.'); @@ -136,15 +141,15 @@ impl ListingTableConfig { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(config_options)), }; Ok((file_format, ext)) } /// Infer `ListingOptions` based on `table_path` suffix. - pub async fn infer_options(self, ctx: &SessionState) -> Result { - let store = ctx + pub async fn infer_options(self, state: &SessionState) -> Result { + let store = state .runtime_env .object_store(self.table_paths.get(0).unwrap())?; @@ -157,12 +162,14 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let (format, file_extension) = - ListingTableConfig::infer_format(file.location.as_ref())?; + let (format, file_extension) = ListingTableConfig::infer_format( + state.config_options(), + file.location.as_ref(), + )?; let listing_options = ListingOptions::new(format) .with_file_extension(file_extension) - .with_target_partitions(ctx.config.target_partitions); + .with_target_partitions(state.config.target_partitions); Ok(Self { table_paths: self.table_paths, @@ -251,11 +258,15 @@ impl ListingOptions { /// Set file extension on [`ListingOptions`] and returns self. /// /// ``` - /// use std::sync::Arc; - /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// # use std::sync::Arc; + /// # use datafusion::prelude::SessionContext; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_file_extension(".parquet"); + /// let ctx = SessionContext::new(); + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::new(ctx.config_options()) + /// )) + /// .with_file_extension(".parquet"); /// /// assert_eq!(listing_options.file_extension, ".parquet"); /// ``` @@ -267,13 +278,17 @@ impl ListingOptions { /// Set table partition column names on [`ListingOptions`] and returns self. /// /// ``` - /// use std::sync::Arc; - /// use arrow::datatypes::DataType; - /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// # use std::sync::Arc; + /// # use arrow::datatypes::DataType; + /// # use datafusion::prelude::{col, SessionContext}; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8), - /// ("col_b".to_string(), DataType::Utf8)]); + /// let ctx = SessionContext::new(); + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::new(ctx.config_options()) + /// )) + /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8), + /// ("col_b".to_string(), DataType::Utf8)]); /// /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8), /// ("col_b".to_string(), DataType::Utf8)]); @@ -289,12 +304,15 @@ impl ListingOptions { /// Set stat collection on [`ListingOptions`] and returns self. /// /// ``` - /// use std::sync::Arc; - /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// # use std::sync::Arc; + /// # use datafusion::prelude::SessionContext; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// // Enable stat collection - /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_collect_stat(true); + /// let ctx = SessionContext::new(); + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::new(ctx.config_options()) + /// )) + /// .with_collect_stat(true); /// /// assert_eq!(listing_options.collect_stat, true); /// ``` @@ -306,11 +324,15 @@ impl ListingOptions { /// Set number of target partitions on [`ListingOptions`] and returns self. /// /// ``` - /// use std::sync::Arc; - /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// # use std::sync::Arc; + /// # use datafusion::prelude::SessionContext; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_target_partitions(8); + /// let ctx = SessionContext::new(); + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::new(ctx.config_options()) + /// )) + /// .with_target_partitions(8); /// /// assert_eq!(listing_options.target_partitions, 8); /// ``` @@ -322,18 +344,20 @@ impl ListingOptions { /// Set file sort order on [`ListingOptions`] and returns self. /// /// ``` - /// use std::sync::Arc; - /// use datafusion::prelude::{Expr, col}; - /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// + /// # use std::sync::Arc; + /// # use datafusion::prelude::{col, SessionContext}; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// /// // Tell datafusion that the files are sorted by column "a" /// let file_sort_order = Some(vec![ /// col("a").sort(true, true) /// ]); /// - /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_file_sort_order(file_sort_order.clone()); + /// let ctx = SessionContext::new(); + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::new(ctx.config_options()) + /// )) + /// .with_file_sort_order(file_sort_order.clone()); /// /// assert_eq!(listing_options.file_sort_order, file_sort_order); /// ``` @@ -709,7 +733,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) @@ -731,7 +755,8 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let options = + ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); let schema = options.infer_schema(&state, &table_path).await.unwrap(); use physical_plan::expressions::col as physical_col; diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 9a38e1768fbc..537288646707 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -82,7 +82,7 @@ impl TableProviderFactory for ListingTableFactory { .with_delimiter(cmd.delimiter as u8) .with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(state.config_options())), FileType::AVRO => Arc::new(AvroFormat::default()), FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 2dd270459ca6..6b6a4fb1f6e6 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -21,6 +21,7 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, + config::OPT_PARQUET_ENABLE_PRUNING, datasource::listing::{ListingOptions, ListingTable}, datasource::{MemTable, ViewTable}, logical_expr::{PlanType, ToStringifiedPlan}, @@ -228,6 +229,11 @@ impl SessionContext { self.state.read().runtime_env.clone() } + /// Return a handle to the shared configuration options + pub fn config_options(&self) -> Arc> { + self.state.read().config_options() + } + /// Return the session_id of this Session pub fn session_id(&self) -> String { self.session_id.clone() @@ -693,9 +699,7 @@ impl SessionContext { options: ParquetReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let target_partitions = self.copied_config().target_partitions; - - let listing_options = options.to_listing_options(target_partitions); + let listing_options = options.to_listing_options(&self.state.read().config); // with parquet we resolve the schema in all cases let resolved_schema = listing_options @@ -814,13 +818,7 @@ impl SessionContext { table_path: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { - let (target_partitions, parquet_pruning) = { - let conf = self.copied_config(); - (conf.target_partitions, conf.parquet_pruning) - }; - let listing_options = options - .parquet_pruning(parquet_pruning) - .to_listing_options(target_partitions); + let listing_options = options.to_listing_options(&self.state.read().config); self.register_listing_table(name, table_path, listing_options, None, None) .await?; @@ -1162,8 +1160,6 @@ pub struct SessionConfig { /// Should DataFusion repartition data using the partition keys to execute window functions in /// parallel using the provided `target_partitions` level pub repartition_windows: bool, - /// Should DataFusion parquet reader using the predicate to prune data - pub parquet_pruning: bool, /// Should DataFusion collect statistics after listing files pub collect_statistics: bool, /// Configuration options @@ -1183,7 +1179,6 @@ impl Default for SessionConfig { repartition_joins: true, repartition_aggregations: true, repartition_windows: true, - parquet_pruning: true, collect_statistics: false, config_options: Arc::new(RwLock::new(ConfigOptions::new())), // Assume no extensions by default. @@ -1287,11 +1282,21 @@ impl SessionConfig { } /// Enables or disables the use of pruning predicate for parquet readers to skip row groups - pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { - self.parquet_pruning = enabled; + pub fn with_parquet_pruning(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_PARQUET_ENABLE_PRUNING, enabled); self } + /// Returns true if pruning predicate should be used to skip parquet row groups + pub fn parquet_pruning(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + .unwrap_or(false) + } + /// Enables or disables the collection of statistics after listing files pub fn with_collect_statistics(mut self, enabled: bool) -> Self { self.collect_statistics = enabled; @@ -1339,7 +1344,7 @@ impl SessionConfig { ); map.insert( PARQUET_PRUNING.to_owned(), - format!("{}", self.parquet_pruning), + format!("{}", self.parquet_pruning()), ); map.insert( COLLECT_STATISTICS.to_owned(), @@ -1733,6 +1738,11 @@ impl SessionState { let logical_plan = self.optimize(logical_plan)?; planner.create_physical_plan(&logical_plan, self).await } + + /// return the configuration options + pub fn config_options(&self) -> Arc> { + self.config.config_options() + } } impl ContextProvider for SessionState { diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 17f59ec7f41d..430d10862456 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -34,6 +34,8 @@ use crate::datasource::{ listing::ListingOptions, }; +use super::context::SessionConfig; + /// Options that control the reading of CSV files. /// /// Note this structure is supplied when a datasource is created and @@ -167,26 +169,24 @@ pub struct ParquetReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec<(String, DataType)>, - /// Should DataFusion parquet reader use the predicate to prune data, - /// overridden by value on execution::context::SessionConfig - // TODO move this into ConfigOptions - pub parquet_pruning: bool, - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - // TODO move this into ConfigOptions - pub skip_metadata: bool, + /// Should the parquet reader use the predicate to prune row groups? + /// If None, uses value in SessionConfig + pub parquet_pruning: Option, + /// Should the parquet reader to skip any metadata that may be in + /// the file Schema? This can help avoid schema conflicts due to + /// metadata. + /// + /// If None specified, uses value in SessionConfig + pub skip_metadata: Option, } impl<'a> Default for ParquetReadOptions<'a> { fn default() -> Self { - let format_default = ParquetFormat::default(); - Self { file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], - parquet_pruning: format_default.enable_pruning(), - skip_metadata: format_default.skip_metadata(), + parquet_pruning: None, + skip_metadata: None, } } } @@ -194,7 +194,7 @@ impl<'a> Default for ParquetReadOptions<'a> { impl<'a> ParquetReadOptions<'a> { /// Specify parquet_pruning pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self { - self.parquet_pruning = parquet_pruning; + self.parquet_pruning = Some(parquet_pruning); self } @@ -202,7 +202,7 @@ impl<'a> ParquetReadOptions<'a> { /// the file Schema. This can help avoid schema conflicts due to /// metadata. Defaults to true. pub fn skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; + self.skip_metadata = Some(skip_metadata); self } @@ -216,14 +216,14 @@ impl<'a> ParquetReadOptions<'a> { } /// Helper to convert these user facing options to `ListingTable` options - pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { - let file_format = ParquetFormat::default() + pub fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions { + let file_format = ParquetFormat::new(config.config_options()) .with_enable_pruning(self.parquet_pruning) .with_skip_metadata(self.skip_metadata); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(target_partitions) + .with_target_partitions(config.target_partitions) .with_table_partition_cols(self.table_partition_cols.clone()) } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 596d2b09e950..edf72b523e4d 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -72,6 +72,16 @@ use super::get_output_ordering; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { + /// Override for `Self::with_pushdown_filters`. If None, uses + /// values from base_config + pushdown_filters: Option, + /// Override for `Self::with_reorder_filters`. If None, uses + /// values from base_config + reorder_filters: Option, + /// Override for `Self::with_enable_page_index`. If None, uses + /// values from base_config + enable_page_index: Option, + /// Base configuraton for this scan base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, @@ -131,6 +141,9 @@ impl ParquetExec { let (projected_schema, projected_statistics) = base_config.project(); Self { + pushdown_filters: None, + reorder_filters: None, + enable_page_index: None, base_config, projected_schema, projected_statistics, @@ -172,20 +185,20 @@ impl ParquetExec { /// `ParquetRecordBatchStream`. These filters are applied by the /// parquet decoder to skip unecessairly decoding other columns /// which would not pass the predicate. Defaults to false - pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self { - self.base_config - .config_options - .write() - .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); + pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { + self.pushdown_filters = Some(pushdown_filters); self } /// Return the value described in [`Self::with_pushdown_filters`] pub fn pushdown_filters(&self) -> bool { - self.base_config - .config_options - .read() - .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS) + self.pushdown_filters + .or_else(|| { + self.base_config + .config_options + .read() + .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS) + }) // default to false .unwrap_or_default() } @@ -194,20 +207,20 @@ impl ParquetExec { /// minimize the cost of filter evaluation by reordering the /// predicate [`Expr`]s. If false, the predicates are applied in /// the same order as specified in the query. Defaults to false. - pub fn with_reorder_filters(self, reorder_filters: bool) -> Self { - self.base_config - .config_options - .write() - .set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); + pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self { + self.reorder_filters = Some(reorder_filters); self } /// Return the value described in [`Self::with_reorder_filters`] pub fn reorder_filters(&self) -> bool { - self.base_config - .config_options - .read() - .get_bool(OPT_PARQUET_REORDER_FILTERS) + self.reorder_filters + .or_else(|| { + self.base_config + .config_options + .read() + .get_bool(OPT_PARQUET_REORDER_FILTERS) + }) // default to false .unwrap_or_default() } @@ -216,20 +229,20 @@ impl ParquetExec { /// This is used to optimise filter pushdown /// via `RowSelector` and `RowFilter` by /// eliminating unnecessary IO and decoding - pub fn with_enable_page_index(self, enable_page_index: bool) -> Self { - self.base_config - .config_options - .write() - .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); + pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self { + self.enable_page_index = Some(enable_page_index); self } /// Return the value described in [`Self::with_enable_page_index`] pub fn enable_page_index(&self) -> bool { - self.base_config - .config_options - .read() - .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX) + self.enable_page_index + .or_else(|| { + self.base_config + .config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX) + }) // default to false .unwrap_or_default() } @@ -1286,7 +1299,8 @@ mod tests { async fn parquet_exec_with_projection() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = "alltypes_plain.parquet"; - let format = ParquetFormat::default(); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()); let parquet_exec = scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None) .await @@ -1369,7 +1383,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let store = Arc::new(LocalFileSystem::new()) as _; - let file_schema = ParquetFormat::default() + let file_schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await?; @@ -1416,7 +1430,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let schema = ParquetFormat::default() + let schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 788082e37ce1..51c55912d668 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -49,7 +49,7 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE size: metadata.len() as usize, }; - let schema = ParquetFormat::default() + let schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 41ae758643a9..40009bad634b 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -562,7 +562,7 @@ async fn register_partitioned_alltypes_parquet( MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), ); - let options = ListingOptions::new(Arc::new(ParquetFormat::default())) + let options = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))) .with_table_partition_cols( partition_cols .iter() diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 1849fc9f8b88..92c47989e877 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -34,7 +34,13 @@ async fn test_with_parquet() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?; + let exec = get_exec( + &session_ctx, + "alltypes_plain.parquet", + projection.as_ref(), + None, + ) + .await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -55,7 +61,13 @@ async fn test_with_parquet_word_aligned() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?; + let exec = get_exec( + &session_ctx, + "alltypes_plain.parquet", + projection.as_ref(), + None, + ) + .await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -72,6 +84,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: Option<&Vec>, limit: Option, @@ -81,7 +94,7 @@ async fn get_exec( let path = Path::from_filesystem_path(filename).unwrap(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); let object_store = Arc::new(LocalFileSystem::new()) as Arc; let object_store_url = ObjectStoreUrl::local_filesystem(); diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index fa2e20569556..75b252ac1e2f 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -707,8 +707,11 @@ async fn show_all() { "| datafusion.execution.coalesce_batches | true |", "| datafusion.execution.coalesce_target_batch_size | 4096 |", "| datafusion.execution.parquet.enable_page_index | false |", + "| datafusion.execution.parquet.metadata_size_hint | NULL |", + "| datafusion.execution.parquet.pruning | true |", "| datafusion.execution.parquet.pushdown_filters | false |", "| datafusion.execution.parquet.reorder_filters | false |", + "| datafusion.execution.parquet.skip_metadata | true |", "| datafusion.execution.time_zone | +00:00 |", "| datafusion.explain.logical_plan_only | false |", "| datafusion.explain.physical_plan_only | false |", diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 18ebf57c0009..e2a33c7c29a1 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -55,11 +55,11 @@ async fn parquet_query() { /// expressions make it all the way down to the ParquetExec async fn parquet_with_sort_order_specified() { let parquet_read_options = ParquetReadOptions::default(); - let target_partitions = 2; + let session_config = SessionConfig::new().with_target_partitions(2); // The sort order is not specified let options_no_sort = parquet_read_options - .to_listing_options(target_partitions) + .to_listing_options(&session_config) .with_file_sort_order(None); // The sort order is specified (not actually correct in this case) @@ -73,7 +73,7 @@ async fn parquet_with_sort_order_specified() { .collect::>(); let options_sort = parquet_read_options - .to_listing_options(target_partitions) + .to_listing_options(&session_config) .with_file_sort_order(Some(file_sort_order)); // This string appears in ParquetExec if the output ordering is diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 346143348fb7..78db01313b36 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -423,7 +423,8 @@ impl AsLogicalPlan for LogicalPlanNode { &FileFormatType::Parquet(protobuf::ParquetFormat { enable_pruning, }) => Arc::new( - ParquetFormat::default().with_enable_pruning(enable_pruning), + ParquetFormat::new(ctx.config_options()) + .with_enable_pruning(Some(enable_pruning)), ), FileFormatType::Csv(protobuf::CsvFormat { has_header, diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index a42050c757d9..8bb31643337f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -43,8 +43,11 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. | | datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. | | datafusion.execution.parquet.enable_page_index | Boolean | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | +| datafusion.execution.parquet.metadata_size_hint | UInt64 | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer. | +| datafusion.execution.parquet.pruning | Boolean | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file. | | datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | | datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | +| datafusion.execution.parquet.skip_metadata | Boolean | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata. | | datafusion.execution.time_zone | Utf8 | +00:00 | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | | then extract the hour. | | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. |