Skip to content

Commit 8ffebbe

Browse files
feat: Implement Row Group Index support with predicate pushdown (Phase 3) (#64)
* impl row index * impl predicate evaluate * covert to RowSelector * adapt arrow_reader and async_arrow_reader * fix test and clippy * cargo fmt * test predicate pushdown
1 parent 7ea0ce7 commit 8ffebbe

File tree

10 files changed

+1935
-18
lines changed

10 files changed

+1935
-18
lines changed

src/arrow_reader.rs

Lines changed: 101 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
2525

2626
use crate::array_decoder::NaiveStripeDecoder;
2727
use crate::error::Result;
28+
use crate::predicate::Predicate;
2829
use crate::projection::ProjectionMask;
2930
use crate::reader::metadata::{read_metadata, FileMetadata};
3031
use crate::reader::ChunkReader;
32+
use crate::row_group_filter::evaluate_predicate;
3133
use crate::row_selection::RowSelection;
3234
use crate::schema::{ArrowSchemaOptions, RootDataType, TimestampPrecision};
3335
use crate::stripe::{Stripe, StripeMetadata};
@@ -43,6 +45,7 @@ pub struct ArrowReaderBuilder<R> {
4345
pub(crate) file_byte_range: Option<Range<usize>>,
4446
pub(crate) row_selection: Option<RowSelection>,
4547
pub(crate) timestamp_precision: TimestampPrecision,
48+
pub(crate) predicate: Option<Predicate>,
4649
}
4750

4851
impl<R> ArrowReaderBuilder<R> {
@@ -56,6 +59,7 @@ impl<R> ArrowReaderBuilder<R> {
5659
file_byte_range: None,
5760
row_selection: None,
5861
timestamp_precision: TimestampPrecision::default(),
62+
predicate: None,
5963
}
6064
}
6165

@@ -133,6 +137,44 @@ impl<R> ArrowReaderBuilder<R> {
133137
self
134138
}
135139

140+
/// Set a predicate for row group filtering
141+
///
142+
/// The predicate will be evaluated against row group statistics to automatically
143+
/// generate a [`RowSelection`] that skips filtered row groups. This provides
144+
/// efficient predicate pushdown based on ORC row indexes.
145+
///
146+
/// The predicate is evaluated lazily when each stripe is read, using the row group
147+
/// statistics from the stripe's index section.
148+
///
149+
/// If both `with_predicate()` and `with_row_selection()` are called, the results
150+
/// are combined using logical AND (both conditions must be satisfied).
151+
///
152+
/// # Example
153+
///
154+
/// ```no_run
155+
/// # use std::fs::File;
156+
/// # use orc_rust::{ArrowReaderBuilder, Predicate, PredicateValue};
157+
/// let file = File::open("data.orc").unwrap();
158+
///
159+
/// // Filter: age >= 18
160+
/// let predicate = Predicate::gte("age", PredicateValue::Int32(Some(18)));
161+
///
162+
/// let reader = ArrowReaderBuilder::try_new(file)
163+
/// .unwrap()
164+
/// .with_predicate(predicate)
165+
/// .build();
166+
/// ```
167+
///
168+
/// # Notes
169+
///
170+
/// - Predicate evaluation requires row indexes to be present in the ORC file
171+
/// - If row indexes are missing, the predicate is ignored (all row groups are kept)
172+
/// - Only primitive columns have row indexes; predicates on compound types may be limited
173+
pub fn with_predicate(mut self, predicate: Predicate) -> Self {
174+
self.predicate = Some(predicate);
175+
self
176+
}
177+
136178
/// Returns the currently computed schema
137179
///
138180
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
@@ -168,6 +210,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
168210
.file_metadata
169211
.root_data_type()
170212
.project(&self.projection);
213+
let projected_data_type_clone = projected_data_type.clone();
171214
let cursor = Cursor {
172215
reader: self.reader,
173216
file_metadata: self.file_metadata,
@@ -181,6 +224,8 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
181224
current_stripe: None,
182225
batch_size: self.batch_size,
183226
row_selection: self.row_selection,
227+
predicate: self.predicate,
228+
projected_data_type: projected_data_type_clone,
184229
}
185230
}
186231
}
@@ -191,6 +236,8 @@ pub struct ArrowReader<R> {
191236
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
192237
batch_size: usize,
193238
row_selection: Option<RowSelection>,
239+
predicate: Option<Predicate>,
240+
projected_data_type: RootDataType,
194241
}
195242

196243
impl<R> ArrowReader<R> {
@@ -204,21 +251,67 @@ impl<R: ChunkReader> ArrowReader<R> {
204251
let stripe = self.cursor.next().transpose()?;
205252
match stripe {
206253
Some(stripe) => {
207-
// Split off the row selection for this stripe
208254
let stripe_rows = stripe.number_of_rows();
209-
let selection = self.row_selection.as_mut().and_then(|s| {
210-
if s.row_count() > 0 {
211-
Some(s.split_off(stripe_rows))
212-
} else {
213-
None
255+
256+
// Evaluate predicate if present
257+
let mut stripe_selection: Option<RowSelection> = None;
258+
if let Some(ref predicate) = self.predicate {
259+
// Try to read row indexes for this stripe
260+
match stripe.read_row_indexes(&self.cursor.file_metadata) {
261+
Ok(row_index) => {
262+
// Evaluate predicate against row group statistics
263+
match evaluate_predicate(
264+
predicate,
265+
&row_index,
266+
&self.projected_data_type,
267+
) {
268+
Ok(row_group_filter) => {
269+
// Generate RowSelection from filter results
270+
let rows_per_group = self
271+
.cursor
272+
.file_metadata
273+
.row_index_stride()
274+
.unwrap_or(10_000);
275+
stripe_selection = Some(RowSelection::from_row_group_filter(
276+
&row_group_filter,
277+
rows_per_group,
278+
stripe_rows,
279+
));
280+
}
281+
Err(_) => {
282+
// Predicate evaluation failed (e.g., column not found)
283+
// Keep all rows (maybe)
284+
stripe_selection = Some(RowSelection::select_all(stripe_rows));
285+
}
286+
}
287+
}
288+
Err(_) => {
289+
// Row indexes not available, keep all rows (maybe)
290+
stripe_selection = Some(RowSelection::select_all(stripe_rows));
291+
}
292+
}
293+
}
294+
295+
// Combine with existing row_selection if present
296+
let mut final_selection = stripe_selection;
297+
if let Some(ref mut existing_selection) = self.row_selection {
298+
if existing_selection.row_count() > 0 {
299+
let existing_for_stripe = existing_selection.split_off(stripe_rows);
300+
final_selection = match final_selection {
301+
Some(predicate_selection) => {
302+
// Both predicate and manual selection: combine with AND
303+
Some(existing_for_stripe.and_then(&predicate_selection))
304+
}
305+
None => Some(existing_for_stripe),
306+
};
214307
}
215-
});
308+
}
216309

217310
let decoder = NaiveStripeDecoder::new_with_selection(
218311
stripe,
219312
self.schema_ref.clone(),
220313
self.batch_size,
221-
selection,
314+
final_selection,
222315
)?;
223316
self.current_stripe = Some(Box::new(decoder));
224317
self.next().transpose()

src/async_arrow_reader.rs

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ use futures_util::FutureExt;
3030
use crate::array_decoder::NaiveStripeDecoder;
3131
use crate::arrow_reader::Cursor;
3232
use crate::error::Result;
33+
use crate::predicate::Predicate;
3334
use crate::reader::metadata::read_metadata_async;
3435
use crate::reader::AsyncChunkReader;
36+
use crate::row_group_filter::evaluate_predicate;
3537
use crate::row_selection::RowSelection;
38+
use crate::schema::RootDataType;
3639
use crate::stripe::{Stripe, StripeMetadata};
3740
use crate::ArrowReaderBuilder;
3841

@@ -79,6 +82,9 @@ pub struct ArrowStreamReader<R: AsyncChunkReader> {
7982
batch_size: usize,
8083
schema_ref: SchemaRef,
8184
row_selection: Option<RowSelection>,
85+
predicate: Option<Predicate>,
86+
projected_data_type: RootDataType,
87+
file_metadata: Arc<crate::reader::metadata::FileMetadata>,
8288
state: StreamState<R>,
8389
}
8490

@@ -131,12 +137,18 @@ impl<R: AsyncChunkReader + 'static> ArrowStreamReader<R> {
131137
batch_size: usize,
132138
schema_ref: SchemaRef,
133139
row_selection: Option<RowSelection>,
140+
predicate: Option<Predicate>,
141+
projected_data_type: RootDataType,
142+
file_metadata: Arc<crate::reader::metadata::FileMetadata>,
134143
) -> Self {
135144
Self {
136145
factory: Some(Box::new(cursor.into())),
137146
batch_size,
138147
schema_ref,
139148
row_selection,
149+
predicate,
150+
projected_data_type,
151+
file_metadata,
140152
state: StreamState::Init,
141153
}
142154
}
@@ -180,21 +192,68 @@ impl<R: AsyncChunkReader + 'static> ArrowStreamReader<R> {
180192
Ok((factory, Some(stripe))) => {
181193
self.factory = Some(Box::new(factory));
182194

183-
// Split off the row selection for this stripe
184195
let stripe_rows = stripe.number_of_rows();
185-
let selection = self.row_selection.as_mut().and_then(|s| {
186-
if s.row_count() > 0 {
187-
Some(s.split_off(stripe_rows))
188-
} else {
189-
None
196+
197+
// Evaluate predicate if present
198+
let mut stripe_selection: Option<RowSelection> = None;
199+
if let Some(ref predicate) = self.predicate {
200+
// Try to read row indexes for this stripe
201+
match stripe.read_row_indexes(&self.file_metadata) {
202+
Ok(row_index) => {
203+
// Evaluate predicate against row group statistics
204+
match evaluate_predicate(
205+
predicate,
206+
&row_index,
207+
&self.projected_data_type,
208+
) {
209+
Ok(row_group_filter) => {
210+
// Generate RowSelection from filter results
211+
let rows_per_group = self
212+
.file_metadata
213+
.row_index_stride()
214+
.unwrap_or(10_000);
215+
stripe_selection =
216+
Some(RowSelection::from_row_group_filter(
217+
&row_group_filter,
218+
rows_per_group,
219+
stripe_rows,
220+
));
221+
}
222+
Err(_) => {
223+
// Predicate evaluation failed (e.g., column not found)
224+
// Keep all rows (maybe)
225+
stripe_selection =
226+
Some(RowSelection::select_all(stripe_rows));
227+
}
228+
}
229+
}
230+
Err(_) => {
231+
// Row indexes not available, keep all rows (maybe)
232+
stripe_selection = Some(RowSelection::select_all(stripe_rows));
233+
}
234+
}
235+
}
236+
237+
// Combine with existing row_selection if present
238+
let mut final_selection = stripe_selection;
239+
if let Some(ref mut existing_selection) = self.row_selection {
240+
if existing_selection.row_count() > 0 {
241+
let existing_for_stripe = existing_selection.split_off(stripe_rows);
242+
final_selection = match final_selection {
243+
Some(predicate_selection) => {
244+
// Both predicate and manual selection: combine with AND
245+
Some(existing_for_stripe.and_then(&predicate_selection))
246+
}
247+
None => Some(existing_for_stripe),
248+
};
190249
}
191-
});
250+
}
192251

193252
match NaiveStripeDecoder::new_with_selection(
194253
stripe,
195254
self.schema_ref.clone(),
196255
self.batch_size,
197-
selection,
256+
final_selection,
198257
) {
199258
Ok(decoder) => {
200259
self.state = StreamState::Decoding(Box::new(decoder));
@@ -241,14 +300,23 @@ impl<R: AsyncChunkReader + 'static> ArrowReaderBuilder<R> {
241300
.file_metadata()
242301
.root_data_type()
243302
.project(&self.projection);
303+
let projected_data_type_clone = projected_data_type.clone();
244304
let schema_ref = self.schema();
245305
let cursor = Cursor {
246306
reader: self.reader,
247-
file_metadata: self.file_metadata,
307+
file_metadata: self.file_metadata.clone(),
248308
projected_data_type,
249309
stripe_index: 0,
250310
file_byte_range: self.file_byte_range,
251311
};
252-
ArrowStreamReader::new(cursor, self.batch_size, schema_ref, self.row_selection)
312+
ArrowStreamReader::new(
313+
cursor,
314+
self.batch_size,
315+
schema_ref,
316+
self.row_selection,
317+
self.predicate,
318+
projected_data_type_clone,
319+
self.file_metadata,
320+
)
253321
}
254322
}

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,13 @@ pub mod compression;
5757
mod encoding;
5858
pub mod error;
5959
mod memory;
60+
pub mod predicate;
6061
pub mod projection;
6162
#[allow(dead_code)]
6263
mod proto;
6364
pub mod reader;
65+
pub mod row_group_filter;
66+
pub mod row_index;
6467
pub mod row_selection;
6568
pub mod schema;
6669
pub mod statistics;
@@ -71,5 +74,6 @@ pub use arrow_reader::{ArrowReader, ArrowReaderBuilder};
7174
pub use arrow_writer::{ArrowWriter, ArrowWriterBuilder};
7275
#[cfg(feature = "async")]
7376
pub use async_arrow_reader::ArrowStreamReader;
77+
pub use predicate::{ComparisonOp, Predicate, PredicateValue};
7478
pub use row_selection::{RowSelection, RowSelector};
7579
pub use schema::{ArrowSchemaOptions, TimestampPrecision};

0 commit comments

Comments
 (0)