Skip to content

Commit ee714ca

Browse files
committed
Move Selection logic into ReadPlan builder
1 parent 0d774fe commit ee714ca

File tree

3 files changed

+525
-67
lines changed

3 files changed

+525
-67
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -808,54 +808,45 @@ impl ParquetRecordBatchReader {
808808
/// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
809809
/// simplify error handling with `?`
810810
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
811+
let mut end_of_stream = false;
811812
let mut read_records = 0;
812813
let batch_size = self.batch_size();
813-
match self.read_plan.selection_mut() {
814-
Some(selection) => {
815-
while read_records < batch_size && !selection.is_empty() {
816-
let front = selection.pop_front().unwrap();
817-
if front.skip {
818-
let skipped = self.array_reader.skip_records(front.row_count)?;
819-
820-
if skipped != front.row_count {
821-
return Err(general_err!(
822-
"failed to skip rows, expected {}, got {}",
823-
front.row_count,
824-
skipped
825-
));
826-
}
827-
continue;
828-
}
814+
while read_records < batch_size {
815+
let Some(front) = self.read_plan.next() else {
816+
end_of_stream = true;
817+
break;
818+
};
829819

830-
//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
831-
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
832-
if front.row_count == 0 {
833-
continue;
834-
}
820+
if front.skip {
821+
let skipped = self.array_reader.skip_records(front.row_count)?;
835822

836-
// try to read record
837-
let need_read = batch_size - read_records;
838-
let to_read = match front.row_count.checked_sub(need_read) {
839-
Some(remaining) if remaining != 0 => {
840-
// if page row count less than batch_size we must set batch size to page row count.
841-
// add check avoid dead loop
842-
selection.push_front(RowSelector::select(remaining));
843-
need_read
844-
}
845-
_ => front.row_count,
846-
};
847-
match self.array_reader.read_records(to_read)? {
848-
0 => break,
849-
rec => read_records += rec,
850-
};
823+
if skipped != front.row_count {
824+
return Err(general_err!(
825+
"Internal Error: failed to skip rows, expected {}, got {}",
826+
front.row_count,
827+
skipped
828+
));
829+
}
830+
} else {
831+
let read = self.array_reader.read_records(front.row_count)?;
832+
if read == 0 {
833+
end_of_stream = true;
834+
break;
851835
}
852-
}
853-
None => {
854-
self.array_reader.read_records(batch_size)?;
855-
}
856-
};
836+
837+
read_records += read
838+
};
839+
}
857840

858841
let array = self.array_reader.consume_batch()?;
842+
843+
// Reader should read exactly `batch_size` records except for last batch
844+
if !end_of_stream && (read_records != batch_size) {
845+
return Err(general_err!(
846+
"Internal Error: unexpected read count. Expected {batch_size} got {read_records}"
847+
));
848+
}
849+
859850
let struct_array = array.as_struct_opt().ok_or_else(|| {
860851
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
861852
})?;

0 commit comments

Comments
 (0)