Skip to content

Commit 4e084a0

Browse files
adriangbzhuqi-lucasalamb
authored and
Nirnay Roy
committed
parquet reader: move pruning predicate creation from ParquetSource to ParquetOpener (apache#15561)
* parquet reader: move pruning predicate creation from ParquetSource to ParquetOpener * use file schema, avoid loading page index if unecessary * Add comment * add comment * Add comment * remove check * fix clippy * update sqllogictest * restore to explain plans * reverted * modify access * Fix ArrowReaderOptions should read with physical_file_schema so we do… (apache#17) * Fix ArrowReaderOptions should read with physical_file_schema so we don't need to cast back to utf8 * Fix fmt * Update opener.rs * Always apply per-file schema during parquet read (apache#18) * Update datafusion/datasource-parquet/src/opener.rs --------- Co-authored-by: Qi Zhu <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 624b667 commit 4e084a0

File tree

6 files changed

+304
-258
lines changed

6 files changed

+304
-258
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ pub(crate) mod test_util {
6767
.into_iter()
6868
.zip(tmp_files.into_iter())
6969
.map(|(batch, mut output)| {
70-
let builder = parquet::file::properties::WriterProperties::builder();
71-
let props = if multi_page {
72-
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
73-
} else {
74-
builder
70+
let mut builder = parquet::file::properties::WriterProperties::builder();
71+
if multi_page {
72+
builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
7573
}
76-
.build();
74+
builder = builder.set_bloom_filter_enabled(true);
75+
76+
let props = builder.build();
7777

7878
let mut writer = parquet::arrow::ArrowWriter::try_new(
7979
&mut output,

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 109 additions & 173 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod tests {
4343
};
4444
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
4545
use arrow::record_batch::RecordBatch;
46+
use arrow::util::pretty::pretty_format_batches;
4647
use arrow_schema::SchemaRef;
4748
use bytes::{BufMut, BytesMut};
4849
use datafusion_common::config::TableParquetOptions;
@@ -61,8 +62,9 @@ mod tests {
6162
use datafusion_execution::object_store::ObjectStoreUrl;
6263
use datafusion_expr::{col, lit, when, Expr};
6364
use datafusion_physical_expr::planner::logical2physical;
65+
use datafusion_physical_plan::analyze::AnalyzeExec;
66+
use datafusion_physical_plan::collect;
6467
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
65-
use datafusion_physical_plan::{collect, displayable};
6668
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
6769

6870
use chrono::{TimeZone, Utc};
@@ -81,10 +83,10 @@ mod tests {
8183
struct RoundTripResult {
8284
/// Data that was read back from ParquetFiles
8385
batches: Result<Vec<RecordBatch>>,
86+
/// The EXPLAIN ANALYZE output
87+
explain: Result<String>,
8488
/// The physical plan that was created (that has statistics, etc)
8589
parquet_exec: Arc<DataSourceExec>,
86-
/// The ParquetSource that is used in plan
87-
parquet_source: ParquetSource,
8890
}
8991

9092
/// round-trip record batches by writing each individual RecordBatch to
@@ -137,71 +139,109 @@ mod tests {
137139
self.round_trip(batches).await.batches
138140
}
139141

140-
/// run the test, returning the `RoundTripResult`
141-
async fn round_trip(self, batches: Vec<RecordBatch>) -> RoundTripResult {
142-
let Self {
143-
projection,
144-
schema,
145-
predicate,
146-
pushdown_predicate,
147-
page_index_predicate,
148-
} = self;
149-
150-
let file_schema = match schema {
151-
Some(schema) => schema,
152-
None => Arc::new(
153-
Schema::try_merge(
154-
batches.iter().map(|b| b.schema().as_ref().clone()),
155-
)
156-
.unwrap(),
157-
),
158-
};
159-
// If testing with page_index_predicate, write parquet
160-
// files with multiple pages
161-
let multi_page = page_index_predicate;
162-
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
163-
let file_group = meta.into_iter().map(Into::into).collect();
164-
142+
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
165143
// set up predicate (this is normally done by a layer higher up)
166-
let predicate = predicate.map(|p| logical2physical(&p, &file_schema));
144+
let predicate = self
145+
.predicate
146+
.as_ref()
147+
.map(|p| logical2physical(p, &file_schema));
167148

168149
let mut source = ParquetSource::default();
169150
if let Some(predicate) = predicate {
170151
source = source.with_predicate(Arc::clone(&file_schema), predicate);
171152
}
172153

173-
if pushdown_predicate {
154+
if self.pushdown_predicate {
174155
source = source
175156
.with_pushdown_filters(true)
176157
.with_reorder_filters(true);
177158
}
178159

179-
if page_index_predicate {
160+
if self.page_index_predicate {
180161
source = source.with_enable_page_index(true);
181162
}
182163

164+
Arc::new(source)
165+
}
166+
167+
fn build_parquet_exec(
168+
&self,
169+
file_schema: SchemaRef,
170+
file_group: FileGroup,
171+
source: Arc<ParquetSource>,
172+
) -> Arc<DataSourceExec> {
183173
let base_config = FileScanConfigBuilder::new(
184174
ObjectStoreUrl::local_filesystem(),
185175
file_schema,
186-
Arc::new(source.clone()),
176+
source,
187177
)
188178
.with_file_group(file_group)
189-
.with_projection(projection)
179+
.with_projection(self.projection.clone())
190180
.build();
181+
DataSourceExec::from_data_source(base_config)
182+
}
183+
184+
/// run the test, returning the `RoundTripResult`
185+
async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult {
186+
let file_schema = match &self.schema {
187+
Some(schema) => schema,
188+
None => &Arc::new(
189+
Schema::try_merge(
190+
batches.iter().map(|b| b.schema().as_ref().clone()),
191+
)
192+
.unwrap(),
193+
),
194+
};
195+
let file_schema = Arc::clone(file_schema);
196+
// If testing with page_index_predicate, write parquet
197+
// files with multiple pages
198+
let multi_page = self.page_index_predicate;
199+
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
200+
let file_group: FileGroup = meta.into_iter().map(Into::into).collect();
201+
202+
// build a ParquetExec to return the results
203+
let parquet_source = self.build_file_source(file_schema.clone());
204+
let parquet_exec = self.build_parquet_exec(
205+
file_schema.clone(),
206+
file_group.clone(),
207+
Arc::clone(&parquet_source),
208+
);
209+
210+
let analyze_exec = Arc::new(AnalyzeExec::new(
211+
false,
212+
false,
213+
// use a new ParquetSource to avoid sharing execution metrics
214+
self.build_parquet_exec(
215+
file_schema.clone(),
216+
file_group.clone(),
217+
self.build_file_source(file_schema.clone()),
218+
),
219+
Arc::new(Schema::new(vec![
220+
Field::new("plan_type", DataType::Utf8, true),
221+
Field::new("plan", DataType::Utf8, true),
222+
])),
223+
));
191224

192225
let session_ctx = SessionContext::new();
193226
let task_ctx = session_ctx.task_ctx();
194227

195-
let parquet_exec = DataSourceExec::from_data_source(base_config.clone());
228+
let batches = collect(
229+
Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>,
230+
task_ctx.clone(),
231+
)
232+
.await;
233+
234+
let explain = collect(analyze_exec, task_ctx.clone())
235+
.await
236+
.map(|batches| {
237+
let batches = pretty_format_batches(&batches).unwrap();
238+
format!("{batches}")
239+
});
240+
196241
RoundTripResult {
197-
batches: collect(parquet_exec.clone(), task_ctx).await,
242+
batches,
243+
explain,
198244
parquet_exec,
199-
parquet_source: base_config
200-
.file_source()
201-
.as_any()
202-
.downcast_ref::<ParquetSource>()
203-
.unwrap()
204-
.clone(),
205245
}
206246
}
207247
}
@@ -1375,26 +1415,6 @@ mod tests {
13751415
create_batch(vec![("c1", c1.clone())])
13761416
}
13771417

1378-
/// Returns a int64 array with contents:
1379-
/// "[-1, 1, null, 2, 3, null, null]"
1380-
fn int64_batch() -> RecordBatch {
1381-
let contents: ArrayRef = Arc::new(Int64Array::from(vec![
1382-
Some(-1),
1383-
Some(1),
1384-
None,
1385-
Some(2),
1386-
Some(3),
1387-
None,
1388-
None,
1389-
]));
1390-
1391-
create_batch(vec![
1392-
("a", contents.clone()),
1393-
("b", contents.clone()),
1394-
("c", contents.clone()),
1395-
])
1396-
}
1397-
13981418
#[tokio::test]
13991419
async fn parquet_exec_metrics() {
14001420
// batch1: c1(string)
@@ -1454,110 +1474,17 @@ mod tests {
14541474
.round_trip(vec![batch1])
14551475
.await;
14561476

1457-
// should have a pruning predicate
1458-
let pruning_predicate = rt.parquet_source.pruning_predicate();
1459-
assert!(pruning_predicate.is_some());
1460-
1461-
// convert to explain plan form
1462-
let display = displayable(rt.parquet_exec.as_ref())
1463-
.indent(true)
1464-
.to_string();
1465-
1466-
assert_contains!(
1467-
&display,
1468-
"pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)"
1469-
);
1470-
1471-
assert_contains!(&display, r#"predicate=c1@0 != bar"#);
1472-
1473-
assert_contains!(&display, "projection=[c1]");
1474-
}
1475-
1476-
#[tokio::test]
1477-
async fn parquet_exec_display_deterministic() {
1478-
// batches: a(int64), b(int64), c(int64)
1479-
let batches = int64_batch();
1480-
1481-
fn extract_required_guarantees(s: &str) -> Option<&str> {
1482-
s.split("required_guarantees=").nth(1)
1483-
}
1484-
1485-
// Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions
1486-
for _ in 0..100 {
1487-
// c = 1 AND b = 1 AND a = 1
1488-
let filter0 = col("c")
1489-
.eq(lit(1))
1490-
.and(col("b").eq(lit(1)))
1491-
.and(col("a").eq(lit(1)));
1492-
1493-
let rt0 = RoundTrip::new()
1494-
.with_predicate(filter0)
1495-
.with_pushdown_predicate()
1496-
.round_trip(vec![batches.clone()])
1497-
.await;
1498-
1499-
let pruning_predicate = rt0.parquet_source.pruning_predicate();
1500-
assert!(pruning_predicate.is_some());
1501-
1502-
let display0 = displayable(rt0.parquet_exec.as_ref())
1503-
.indent(true)
1504-
.to_string();
1505-
1506-
let guarantees0: &str = extract_required_guarantees(&display0)
1507-
.expect("Failed to extract required_guarantees");
1508-
// Compare only the required_guarantees part (Because the file_groups part will not be the same)
1509-
assert_eq!(
1510-
guarantees0.trim(),
1511-
"[a in (1), b in (1), c in (1)]",
1512-
"required_guarantees don't match"
1513-
);
1514-
}
1477+
let explain = rt.explain.unwrap();
15151478

1516-
// c = 1 AND a = 1 AND b = 1
1517-
let filter1 = col("c")
1518-
.eq(lit(1))
1519-
.and(col("a").eq(lit(1)))
1520-
.and(col("b").eq(lit(1)));
1479+
// check that there was a pruning predicate -> row groups got pruned
1480+
assert_contains!(&explain, "predicate=c1@0 != bar");
15211481

1522-
let rt1 = RoundTrip::new()
1523-
.with_predicate(filter1)
1524-
.with_pushdown_predicate()
1525-
.round_trip(vec![batches.clone()])
1526-
.await;
1527-
1528-
// b = 1 AND a = 1 AND c = 1
1529-
let filter2 = col("b")
1530-
.eq(lit(1))
1531-
.and(col("a").eq(lit(1)))
1532-
.and(col("c").eq(lit(1)));
1482+
// there's a single row group, but we can check that it matched
1483+
// if no pruning was done this would be 0 instead of 1
1484+
assert_contains!(&explain, "row_groups_matched_statistics=1");
15331485

1534-
let rt2 = RoundTrip::new()
1535-
.with_predicate(filter2)
1536-
.with_pushdown_predicate()
1537-
.round_trip(vec![batches])
1538-
.await;
1539-
1540-
// should have a pruning predicate
1541-
let pruning_predicate = rt1.parquet_source.pruning_predicate();
1542-
assert!(pruning_predicate.is_some());
1543-
let pruning_predicate = rt2.parquet_source.predicate();
1544-
assert!(pruning_predicate.is_some());
1545-
1546-
// convert to explain plan form
1547-
let display1 = displayable(rt1.parquet_exec.as_ref())
1548-
.indent(true)
1549-
.to_string();
1550-
let display2 = displayable(rt2.parquet_exec.as_ref())
1551-
.indent(true)
1552-
.to_string();
1553-
1554-
let guarantees1 = extract_required_guarantees(&display1)
1555-
.expect("Failed to extract required_guarantees");
1556-
let guarantees2 = extract_required_guarantees(&display2)
1557-
.expect("Failed to extract required_guarantees");
1558-
1559-
// Compare only the required_guarantees part (Because the predicate part will not be the same)
1560-
assert_eq!(guarantees1, guarantees2, "required_guarantees don't match");
1486+
// check the projection
1487+
assert_contains!(&explain, "projection=[c1]");
15611488
}
15621489

15631490
#[tokio::test]
@@ -1581,16 +1508,19 @@ mod tests {
15811508
.await;
15821509

15831510
// Should not contain a pruning predicate (since nothing can be pruned)
1584-
let pruning_predicate = rt.parquet_source.pruning_predicate();
1585-
assert!(
1586-
pruning_predicate.is_none(),
1587-
"Still had pruning predicate: {pruning_predicate:?}"
1588-
);
1511+
let explain = rt.explain.unwrap();
15891512

1590-
// but does still has a pushdown down predicate
1591-
let predicate = rt.parquet_source.predicate();
1592-
let filter_phys = logical2physical(&filter, rt.parquet_exec.schema().as_ref());
1593-
assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
1513+
// When both matched and pruned are 0, it means that the pruning predicate
1514+
// was not used at all.
1515+
assert_contains!(&explain, "row_groups_matched_statistics=0");
1516+
assert_contains!(&explain, "row_groups_pruned_statistics=0");
1517+
1518+
// But pushdown predicate should be present
1519+
assert_contains!(
1520+
&explain,
1521+
"predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
1522+
);
1523+
assert_contains!(&explain, "pushdown_rows_pruned=5");
15941524
}
15951525

15961526
#[tokio::test]
@@ -1616,8 +1546,14 @@ mod tests {
16161546
.await;
16171547

16181548
// Should have a pruning predicate
1619-
let pruning_predicate = rt.parquet_source.pruning_predicate();
1620-
assert!(pruning_predicate.is_some());
1549+
let explain = rt.explain.unwrap();
1550+
assert_contains!(
1551+
&explain,
1552+
"predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END"
1553+
);
1554+
1555+
// And bloom filters should have been evaluated
1556+
assert_contains!(&explain, "row_groups_pruned_bloom_filter=1");
16211557
}
16221558

16231559
/// Returns the sum of all the metrics with the specified name

0 commit comments

Comments
 (0)