Skip to content

Commit 0e03bdc

Browse files
committed
fix, add test
1 parent 7301d4e commit 0e03bdc

File tree

8 files changed

+280
-37
lines changed

8 files changed

+280
-37
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_common::{Constraints, Statistics};
3131
use datafusion_datasource::file::FileSource;
3232
use datafusion_datasource::file_scan_config::FileScanConfig;
3333
use datafusion_datasource::source::DataSourceExec;
34+
use datafusion_datasource::PartitionedFile;
3435
use datafusion_datasource_json::source::JsonSource;
3536
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3637
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
@@ -269,7 +270,11 @@ pub struct ArrowOpener {
269270
}
270271

271272
impl FileOpener for ArrowOpener {
272-
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> {
273+
fn open(
274+
&self,
275+
file_meta: FileMeta,
276+
_file: PartitionedFile,
277+
) -> Result<FileOpenFuture> {
273278
let object_store = Arc::clone(&self.object_store);
274279
let projection = self.projection.clone();
275280
Ok(Box::pin(async move {

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ pub struct TestOpener {
6767
}
6868

6969
impl FileOpener for TestOpener {
70-
fn open(&self, _file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> {
70+
fn open(
71+
&self,
72+
_file_meta: FileMeta,
73+
file: PartitionedFile,
74+
) -> Result<FileOpenFuture> {
7175
let mut batches = self.batches.clone();
7276
if let Some(batch_size) = self.batch_size {
7377
let batch = concat_batches(&batches[0].schema(), &batches)?;

datafusion/datasource-avro/src/source.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ mod private {
129129
use super::*;
130130

131131
use bytes::Buf;
132-
use datafusion_datasource::{file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile};
132+
use datafusion_datasource::{
133+
file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
134+
};
133135
use futures::StreamExt;
134136
use object_store::{GetResultPayload, ObjectStore};
135137

@@ -139,7 +141,11 @@ mod private {
139141
}
140142

141143
impl FileOpener for AvroOpener {
142-
fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result<FileOpenFuture> {
144+
fn open(
145+
&self,
146+
file_meta: FileMeta,
147+
_file: PartitionedFile,
148+
) -> Result<FileOpenFuture> {
143149
let config = Arc::clone(&self.config);
144150
let object_store = Arc::clone(&self.object_store);
145151
Ok(Box::pin(async move {

datafusion/datasource-csv/src/source.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
2828
use datafusion_datasource::file_meta::FileMeta;
2929
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3030
use datafusion_datasource::{
31-
calculate_range, FileRange, ListingTableUrl, PartitionedFile, RangeCalculation
31+
calculate_range, FileRange, ListingTableUrl, PartitionedFile, RangeCalculation,
3232
};
3333

3434
use arrow::csv;
@@ -300,7 +300,11 @@ impl FileOpener for CsvOpener {
300300
/// A,1,2,3,4,5,6,7,8,9\n
301301
/// A},1,2,3,4,5,6,7,8,9\n
302302
/// The lines read would be: [1, 2]
303-
fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result<FileOpenFuture> {
303+
fn open(
304+
&self,
305+
file_meta: FileMeta,
306+
_file: PartitionedFile,
307+
) -> Result<FileOpenFuture> {
304308
// `self.config.has_header` controls whether to skip reading the 1st line header
305309
// If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
306310
// partition, then don't skip first line

datafusion/datasource-json/src/source.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
3030
use datafusion_datasource::file_compression_type::FileCompressionType;
3131
use datafusion_datasource::file_meta::FileMeta;
3232
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
33-
use datafusion_datasource::{calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation};
33+
use datafusion_datasource::{
34+
calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
35+
};
3436
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3537

3638
use arrow::json::ReaderBuilder;
@@ -328,7 +330,11 @@ impl FileOpener for JsonOpener {
328330
/// are applied to determine which lines to read:
329331
/// 1. The first line of the partition is the line in which the index of the first character >= `start`.
330332
/// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
331-
fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result<FileOpenFuture> {
333+
fn open(
334+
&self,
335+
file_meta: FileMeta,
336+
_file: PartitionedFile,
337+
) -> Result<FileOpenFuture> {
332338
let store = Arc::clone(&self.object_store);
333339
let schema = Arc::clone(&self.projected_schema);
334340
let batch_size = self.batch_size;

datafusion/datasource-parquet/src/opener.rs

Lines changed: 158 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,24 +92,35 @@ impl FileOpener for ParquetOpener {
9292
// Prune using known statistics
9393
match (&file.statistics, &self.predicate) {
9494
(Some(stats), Some(predicate)) => {
95-
let pruning_predicate =
96-
build_pruning_predicate(Arc::clone(predicate), &self.table_schema, &predicate_creation_errors);
95+
let pruning_predicate = build_pruning_predicate(
96+
Arc::clone(predicate),
97+
&self.table_schema,
98+
&predicate_creation_errors,
99+
);
97100
if let Some(pruning_predicate) = pruning_predicate {
98101
let pruning_stats = PrunableStatistics::new(
99-
Arc::clone(stats),
102+
vec![Arc::clone(stats)],
100103
Arc::clone(&self.table_schema),
101104
);
102105
match pruning_predicate.prune(&pruning_stats) {
103106
Ok(values) => {
104107
// We expect a single container -> if all containers are false skip this file
105108
if values.into_iter().all(|v| !v) {
106109
// Return an empty stream
107-
todo!()
110+
return Ok(Box::pin(async move {
111+
Ok(futures::stream::empty().boxed())
112+
}));
108113
}
109114
}
110115
// stats filter array could not be built, so we can't prune
111116
Err(e) => {
112-
log::debug!("Error evaluating row group predicate values {e}");
117+
let err = format!(
118+
"Error evaluating row group predicate values {e}"
119+
);
120+
println!("{err}");
121+
log::debug!(
122+
"Error evaluating row group predicate values {e}"
123+
);
113124
predicate_creation_errors.add(1);
114125
}
115126
}
@@ -469,3 +480,145 @@ async fn load_page_index<T: AsyncFileReader>(
469480
Ok(reader_metadata)
470481
}
471482
}
483+
484+
#[cfg(test)]
485+
mod test {
486+
use std::sync::Arc;
487+
488+
use bytes::{BufMut, BytesMut};
489+
use chrono::Utc;
490+
use datafusion_common::{
491+
record_batch, stats::Precision, ColumnStatistics, ScalarValue, Statistics,
492+
};
493+
use datafusion_datasource::{
494+
file_meta::FileMeta, file_stream::FileOpener,
495+
schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile,
496+
};
497+
use datafusion_expr::{col, lit};
498+
use datafusion_physical_expr::planner::logical2physical;
499+
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
500+
use futures::{Stream, StreamExt};
501+
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
502+
use parquet::arrow::ArrowWriter;
503+
504+
use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory};
505+
506+
async fn count_batches_and_rows(
507+
mut stream: std::pin::Pin<
508+
Box<
509+
dyn Stream<
510+
Item = Result<
511+
arrow::array::RecordBatch,
512+
arrow::error::ArrowError,
513+
>,
514+
> + Send,
515+
>,
516+
>,
517+
) -> (usize, usize) {
518+
let mut num_batches = 0;
519+
let mut num_rows = 0;
520+
while let Some(Ok(batch)) = stream.next().await {
521+
num_rows += batch.num_rows();
522+
num_batches += 1;
523+
}
524+
(num_batches, num_rows)
525+
}
526+
527+
#[tokio::test]
528+
async fn test_prune_based_on_statistics() {
529+
let batch = record_batch!(
530+
("a", Int32, vec![Some(1), Some(2), Some(2)]),
531+
("b", Float32, vec![Some(1.0), Some(2.0), None])
532+
)
533+
.unwrap();
534+
535+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
536+
let mut out = BytesMut::new().writer();
537+
{
538+
let mut writer =
539+
ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap();
540+
writer.write(&batch).unwrap();
541+
writer.finish().unwrap();
542+
}
543+
let data = out.into_inner().freeze();
544+
let data_size = data.len();
545+
store
546+
.put(&Path::from("test.parquet"), data.into())
547+
.await
548+
.unwrap();
549+
550+
let schema = batch.schema();
551+
let file = PartitionedFile::new(
552+
"file.parquet".to_string(),
553+
u64::try_from(data_size).unwrap(),
554+
)
555+
.with_statistics(Arc::new(
556+
Statistics::new_unknown(&schema)
557+
.add_column_statistics(ColumnStatistics::new_unknown())
558+
.add_column_statistics(
559+
ColumnStatistics::new_unknown()
560+
.with_min_value(Precision::Exact(ScalarValue::Float32(Some(1.0))))
561+
.with_max_value(Precision::Exact(ScalarValue::Float32(Some(2.0))))
562+
.with_null_count(Precision::Exact(1)),
563+
),
564+
));
565+
566+
let make_opener = |predicate| {
567+
ParquetOpener {
568+
partition_index: 0,
569+
projection: Arc::new([0, 1]),
570+
batch_size: 1024,
571+
limit: None,
572+
predicate: Some(predicate),
573+
table_schema: schema.clone(),
574+
metadata_size_hint: None,
575+
metrics: ExecutionPlanMetricsSet::new(),
576+
parquet_file_reader_factory: Arc::new(
577+
DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
578+
),
579+
pushdown_filters: false, // note that this is false!
580+
reorder_filters: false,
581+
enable_page_index: false,
582+
enable_bloom_filter: false,
583+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
584+
enable_row_group_stats_pruning: true,
585+
coerce_int96: None,
586+
}
587+
};
588+
589+
let make_meta = || FileMeta {
590+
object_meta: ObjectMeta {
591+
location: Path::from("test.parquet"),
592+
last_modified: Utc::now(),
593+
size: u64::try_from(data_size).unwrap(),
594+
e_tag: None,
595+
version: None,
596+
},
597+
range: None,
598+
extensions: None,
599+
metadata_size_hint: None,
600+
};
601+
602+
// A filter on "a" should not exclude any rows even if it matches the data
603+
let expr = col("a").eq(lit(1));
604+
let predicate = logical2physical(&expr, &schema);
605+
let opener = make_opener(predicate);
606+
let stream = opener
607+
.open(make_meta(), file.clone())
608+
.unwrap()
609+
.await
610+
.unwrap();
611+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
612+
assert_eq!(num_batches, 1);
613+
assert_eq!(num_rows, 3);
614+
615+
// A filter on `b = 5.0` should exclude all rows
616+
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
617+
let predicate = logical2physical(&expr, &schema);
618+
let opener = make_opener(predicate);
619+
let stream = opener.open(make_meta(), file).unwrap().await.unwrap();
620+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
621+
assert_eq!(num_batches, 0);
622+
assert_eq!(num_rows, 0);
623+
}
624+
}

datafusion/datasource/src/file_stream.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,11 @@ mod tests {
556556
}
557557

558558
impl FileOpener for TestOpener {
559-
fn open(&self, _file_meta: FileMeta, _file: PartitionedFile) -> Result<FileOpenFuture> {
559+
fn open(
560+
&self,
561+
_file_meta: FileMeta,
562+
_file: PartitionedFile,
563+
) -> Result<FileOpenFuture> {
560564
let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
561565

562566
if self.error_opening_idx.contains(&idx) {

0 commit comments

Comments
 (0)