Skip to content

Commit 9815ac6

Browse files
Handle merged schemas in parquet pruning (#2170)
* Handle merged schemas in parquet pruning * Handle merged schemas in ListingTable stats collection * Update datafusion/core/src/physical_plan/file_format/parquet.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/datasource/file_format/mod.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/physical_plan/file_format/parquet.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/physical_plan/file_format/parquet.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/physical_plan/file_format/parquet.rs Co-authored-by: Andrew Lamb <[email protected]> * Add comments and cargo fmt Co-authored-by: Andrew Lamb <[email protected]>
1 parent ddf29f1 commit 9815ac6

File tree

9 files changed

+317
-55
lines changed

9 files changed

+317
-55
lines changed

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ impl FileFormat for AvroFormat {
5757
Ok(Arc::new(merged_schema))
5858
}
5959

60-
async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
60+
async fn infer_stats(
61+
&self,
62+
_reader: Arc<dyn ObjectReader>,
63+
_table_schema: SchemaRef,
64+
) -> Result<Statistics> {
6165
Ok(Statistics::default())
6266
}
6367

@@ -367,7 +371,7 @@ mod tests {
367371
.await
368372
.expect("Schema inference");
369373
let statistics = format
370-
.infer_stats(local_object_reader(filename.clone()))
374+
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
371375
.await
372376
.expect("Stats inference");
373377
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ impl FileFormat for CsvFormat {
120120
Ok(Arc::new(merged_schema))
121121
}
122122

123-
async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
123+
async fn infer_stats(
124+
&self,
125+
_reader: Arc<dyn ObjectReader>,
126+
_table_schema: SchemaRef,
127+
) -> Result<Statistics> {
124128
Ok(Statistics::default())
125129
}
126130

@@ -265,7 +269,7 @@ mod tests {
265269
.await
266270
.expect("Schema inference");
267271
let statistics = format
268-
.infer_stats(local_object_reader(filename.clone()))
272+
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
269273
.await
270274
.expect("Stats inference");
271275
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ impl FileFormat for JsonFormat {
9292
Ok(Arc::new(schema))
9393
}
9494

95-
async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
95+
async fn infer_stats(
96+
&self,
97+
_reader: Arc<dyn ObjectReader>,
98+
_table_schema: SchemaRef,
99+
) -> Result<Statistics> {
96100
Ok(Statistics::default())
97101
}
98102

@@ -219,7 +223,10 @@ mod tests {
219223
.await
220224
.expect("Schema inference");
221225
let statistics = format
222-
.infer_stats(local_object_reader(filename.to_owned()))
226+
.infer_stats(
227+
local_object_reader(filename.to_owned()),
228+
file_schema.clone(),
229+
)
223230
.await
224231
.expect("Stats inference");
225232
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,16 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
5656

5757
/// Infer the statistics for the provided object. The cost and accuracy of the
5858
/// estimated statistics might vary greatly between file formats.
59-
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
59+
///
60+
/// `table_schema` is the (combined) schema of the overall table
61+
/// and may be a superset of the schema contained in this file.
62+
///
63+
/// TODO: should the file source return statistics for only columns referred to in the table schema?
64+
async fn infer_stats(
65+
&self,
66+
reader: Arc<dyn ObjectReader>,
67+
table_schema: SchemaRef,
68+
) -> Result<Statistics>;
6069

6170
/// Take a list of files and convert it to the appropriate executor
6271
/// according to this file format.

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 137 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use arrow::datatypes::Schema;
2525
use arrow::datatypes::SchemaRef;
2626
use async_trait::async_trait;
2727
use futures::TryStreamExt;
28+
use hashbrown::HashMap;
2829
use parquet::arrow::ArrowReader;
2930
use parquet::arrow::ParquetFileArrowReader;
3031
use parquet::errors::ParquetError;
@@ -46,7 +47,7 @@ use crate::error::Result;
4647
use crate::logical_plan::combine_filters;
4748
use crate::logical_plan::Expr;
4849
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
49-
use crate::physical_plan::file_format::ParquetExec;
50+
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
5051
use crate::physical_plan::ExecutionPlan;
5152
use crate::physical_plan::{Accumulator, Statistics};
5253
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
@@ -99,8 +100,12 @@ impl FileFormat for ParquetFormat {
99100
Ok(Arc::new(merged_schema))
100101
}
101102

102-
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
103-
let stats = fetch_statistics(reader)?;
103+
async fn infer_stats(
104+
&self,
105+
reader: Arc<dyn ObjectReader>,
106+
table_schema: SchemaRef,
107+
) -> Result<Statistics> {
108+
let stats = fetch_statistics(reader, table_schema)?;
104109
Ok(stats)
105110
}
106111

@@ -279,46 +284,65 @@ fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
279284
}
280285

281286
/// Read and parse the statistics of the Parquet file at location `path`
282-
fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
287+
fn fetch_statistics(
288+
object_reader: Arc<dyn ObjectReader>,
289+
table_schema: SchemaRef,
290+
) -> Result<Statistics> {
283291
let obj_reader = ChunkObjectReader(object_reader);
284292
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
285293
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
286-
let schema = arrow_reader.get_schema()?;
287-
let num_fields = schema.fields().len();
288-
let fields = schema.fields().to_vec();
294+
let file_schema = arrow_reader.get_schema()?;
295+
let num_fields = table_schema.fields().len();
296+
let fields = table_schema.fields().to_vec();
289297
let meta_data = arrow_reader.get_metadata();
290298

291299
let mut num_rows = 0;
292300
let mut total_byte_size = 0;
293301
let mut null_counts = vec![0; num_fields];
294302
let mut has_statistics = false;
295303

296-
let (mut max_values, mut min_values) = create_max_min_accs(&schema);
304+
let schema_adapter = SchemaAdapter::new(table_schema.clone());
305+
306+
let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
297307

298308
for row_group_meta in meta_data.row_groups() {
299309
num_rows += row_group_meta.num_rows();
300310
total_byte_size += row_group_meta.total_byte_size();
301311

302-
let columns_null_counts = row_group_meta
303-
.columns()
304-
.iter()
305-
.flat_map(|c| c.statistics().map(|stats| stats.null_count()));
306-
307-
for (i, cnt) in columns_null_counts.enumerate() {
308-
null_counts[i] += cnt as usize
309-
}
312+
let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();
310313

311314
for (i, column) in row_group_meta.columns().iter().enumerate() {
312315
if let Some(stat) = column.statistics() {
313316
has_statistics = true;
314-
summarize_min_max(&mut max_values, &mut min_values, &fields, i, stat)
317+
column_stats.insert(i, (stat.null_count(), stat));
318+
}
319+
}
320+
321+
if has_statistics {
322+
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
323+
if let Some(file_idx) =
324+
schema_adapter.map_column_index(table_idx, &file_schema)
325+
{
326+
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
327+
*null_cnt += *null_count as usize;
328+
summarize_min_max(
329+
&mut max_values,
330+
&mut min_values,
331+
&fields,
332+
table_idx,
333+
stats,
334+
)
335+
}
336+
} else {
337+
*null_cnt += num_rows as usize;
338+
}
315339
}
316340
}
317341
}
318342

319343
let column_stats = if has_statistics {
320344
Some(get_col_stats(
321-
&schema,
345+
&table_schema,
322346
null_counts,
323347
&mut max_values,
324348
&mut min_values,
@@ -369,10 +393,102 @@ mod tests {
369393

370394
use crate::prelude::{SessionConfig, SessionContext};
371395
use arrow::array::{
372-
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
373-
TimestampNanosecondArray,
396+
ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
397+
StringArray, TimestampNanosecondArray,
374398
};
399+
use arrow::record_batch::RecordBatch;
400+
use datafusion_common::ScalarValue;
375401
use futures::StreamExt;
402+
use parquet::arrow::ArrowWriter;
403+
use parquet::file::properties::WriterProperties;
404+
use tempfile::NamedTempFile;
405+
406+
// Add a new column with the specified field name to the RecordBatch
407+
fn add_to_batch(
408+
batch: &RecordBatch,
409+
field_name: &str,
410+
array: ArrayRef,
411+
) -> RecordBatch {
412+
let mut fields = batch.schema().fields().clone();
413+
fields.push(Field::new(field_name, array.data_type().clone(), true));
414+
let schema = Arc::new(Schema::new(fields));
415+
416+
let mut columns = batch.columns().to_vec();
417+
columns.push(array);
418+
RecordBatch::try_new(schema, columns).expect("error; creating record batch")
419+
}
420+
421+
fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
422+
columns.into_iter().fold(
423+
RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
424+
|batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
425+
)
426+
}
427+
428+
async fn create_table(
429+
batches: Vec<RecordBatch>,
430+
) -> Result<(Vec<NamedTempFile>, Schema)> {
431+
let merged_schema =
432+
Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))?;
433+
434+
let files: Vec<_> = batches
435+
.into_iter()
436+
.map(|batch| {
437+
let output = tempfile::NamedTempFile::new().expect("creating temp file");
438+
439+
let props = WriterProperties::builder().build();
440+
let file: std::fs::File = (*output.as_file())
441+
.try_clone()
442+
.expect("cloning file descriptor");
443+
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
444+
.expect("creating writer");
445+
446+
writer.write(&batch).expect("Writing batch");
447+
writer.close().unwrap();
448+
output
449+
})
450+
.collect();
451+
452+
Ok((files, merged_schema))
453+
}
454+
455+
#[tokio::test]
456+
async fn read_merged_batches() -> Result<()> {
457+
let c1: ArrayRef =
458+
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
459+
460+
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
461+
462+
let batch1 = create_batch(vec![("c1", c1.clone())]);
463+
464+
let batch2 = create_batch(vec![("c2", c2)]);
465+
466+
let (files, schema) = create_table(vec![batch1, batch2]).await?;
467+
let table_schema = Arc::new(schema);
468+
469+
let reader = local_object_reader(files[0].path().to_string_lossy().to_string());
470+
471+
let stats = fetch_statistics(reader, table_schema.clone())?;
472+
473+
assert_eq!(stats.num_rows, Some(3));
474+
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
475+
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
476+
assert_eq!(c1_stats.null_count, Some(1));
477+
assert_eq!(c2_stats.null_count, Some(3));
478+
479+
let reader = local_object_reader(files[1].path().to_string_lossy().to_string());
480+
481+
let stats = fetch_statistics(reader, table_schema)?;
482+
assert_eq!(stats.num_rows, Some(3));
483+
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
484+
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
485+
assert_eq!(c1_stats.null_count, Some(3));
486+
assert_eq!(c2_stats.null_count, Some(1));
487+
assert_eq!(c2_stats.max_value, Some(ScalarValue::Int64(Some(2))));
488+
assert_eq!(c2_stats.min_value, Some(ScalarValue::Int64(Some(1))));
489+
490+
Ok(())
491+
}
376492

377493
#[tokio::test]
378494
async fn read_small_batches() -> Result<()> {
@@ -645,7 +761,7 @@ mod tests {
645761
.await
646762
.expect("Schema inference");
647763
let statistics = format
648-
.infer_stats(local_object_reader(filename.clone()))
764+
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
649765
.await
650766
.expect("Stats inference");
651767
let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];

datafusion/core/src/datasource/listing/table.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,10 @@ impl ListingTable {
376376
let statistics = if self.options.collect_stat {
377377
let object_reader = object_store
378378
.file_reader(part_file.file_meta.sized_file.clone())?;
379-
self.options.format.infer_stats(object_reader).await?
379+
self.options
380+
.format
381+
.infer_stats(object_reader, self.file_schema.clone())
382+
.await?
380383
} else {
381384
Statistics::default()
382385
};

datafusion/core/src/physical_plan/file_format/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ impl SchemaAdapter {
212212
Self { table_schema }
213213
}
214214

215+
/// Map a column index in the table schema to a column index in a particular
216+
/// file schema
217+
/// Panics if index is not in range for the table schema
218+
pub(crate) fn map_column_index(
219+
&self,
220+
index: usize,
221+
file_schema: &Schema,
222+
) -> Option<usize> {
223+
let field = self.table_schema.field(index);
224+
file_schema.index_of(field.name()).ok()
225+
}
226+
215227
/// Map projected column indexes to the file schema. This will fail if the table schema
216228
/// and the file schema contain a field with the same name and different types.
217229
pub fn map_projections(

0 commit comments

Comments
 (0)