Skip to content

Commit f33628e

Browse files
authored
refactor: Remove spawn and channel inside arrow reader (#806)
Signed-off-by: Xuanwo <[email protected]>
1 parent 0e5a3c3 commit f33628e

File tree

2 files changed

+35
-52
lines changed

2 files changed

+35
-52
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ use arrow_schema::{
3131
use arrow_string::like::starts_with;
3232
use bytes::Bytes;
3333
use fnv::FnvHashSet;
34-
use futures::channel::mpsc::{channel, Sender};
3534
use futures::future::BoxFuture;
36-
use futures::{try_join, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
35+
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3736
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
3837
use parquet::arrow::async_reader::AsyncFileReader;
3938
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
@@ -48,7 +47,6 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
4847
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
4948
use crate::expr::{BoundPredicate, BoundReference};
5049
use crate::io::{FileIO, FileMetadata, FileRead};
51-
use crate::runtime::spawn;
5250
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
5351
use crate::spec::{Datum, PrimitiveType, Schema};
5452
use crate::utils::available_parallelism;
@@ -130,62 +128,41 @@ pub struct ArrowReader {
130128
impl ArrowReader {
131129
/// Take a stream of FileScanTasks and reads all the files.
132130
/// Returns a stream of Arrow RecordBatches containing the data from the files
133-
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
131+
pub async fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
134132
let file_io = self.file_io.clone();
135133
let batch_size = self.batch_size;
136134
let concurrency_limit_data_files = self.concurrency_limit_data_files;
137135
let row_group_filtering_enabled = self.row_group_filtering_enabled;
138136
let row_selection_enabled = self.row_selection_enabled;
139137

140-
let (tx, rx) = channel(concurrency_limit_data_files);
141-
let mut channel_for_error = tx.clone();
142-
143-
spawn(async move {
144-
let result = tasks
145-
.map(|task| Ok((task, file_io.clone(), tx.clone())))
146-
.try_for_each_concurrent(
147-
concurrency_limit_data_files,
148-
|(file_scan_task, file_io, tx)| async move {
149-
match file_scan_task {
150-
Ok(task) => {
151-
let file_path = task.data_file_path.to_string();
152-
153-
spawn(async move {
154-
Self::process_file_scan_task(
155-
task,
156-
batch_size,
157-
file_io,
158-
tx,
159-
row_group_filtering_enabled,
160-
row_selection_enabled,
161-
)
162-
.await
163-
})
164-
.await
165-
.map_err(|e| e.with_context("file_path", file_path))
166-
}
167-
Err(err) => Err(err),
168-
}
169-
},
170-
)
171-
.await;
138+
let stream = tasks
139+
.map_ok(move |task| {
140+
let file_io = file_io.clone();
172141

173-
if let Err(error) = result {
174-
let _ = channel_for_error.send(Err(error)).await;
175-
}
176-
});
142+
Self::process_file_scan_task(
143+
task,
144+
batch_size,
145+
file_io,
146+
row_group_filtering_enabled,
147+
row_selection_enabled,
148+
)
149+
})
150+
.map_err(|err| {
151+
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
152+
})
153+
.try_buffer_unordered(concurrency_limit_data_files)
154+
.try_flatten_unordered(concurrency_limit_data_files);
177155

178-
return Ok(rx.boxed());
156+
Ok(Box::pin(stream) as ArrowRecordBatchStream)
179157
}
180158

181159
async fn process_file_scan_task(
182160
task: FileScanTask,
183161
batch_size: Option<usize>,
184162
file_io: FileIO,
185-
mut tx: Sender<Result<RecordBatch>>,
186163
row_group_filtering_enabled: bool,
187164
row_selection_enabled: bool,
188-
) -> Result<()> {
165+
) -> Result<ArrowRecordBatchStream> {
189166
// Get the metadata for the Parquet file we need to read and build
190167
// a reader for the data within
191168
let parquet_file = file_io.new_input(&task.data_file_path)?;
@@ -269,14 +246,15 @@ impl ArrowReader {
269246

270247
// Build the batch stream and send all the RecordBatches that it generates
271248
// to the requester.
272-
let mut record_batch_stream = record_batch_stream_builder.build()?;
273-
274-
while let Some(batch) = record_batch_stream.try_next().await? {
275-
tx.send(record_batch_transformer.process_record_batch(batch))
276-
.await?
277-
}
278-
279-
Ok(())
249+
let record_batch_stream =
250+
record_batch_stream_builder
251+
.build()?
252+
.map(move |batch| match batch {
253+
Ok(batch) => record_batch_transformer.process_record_batch(batch),
254+
Err(err) => Err(err.into()),
255+
});
256+
257+
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
280258
}
281259

282260
fn build_field_id_set_and_map(

crates/iceberg/src/scan.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,10 @@ impl TableScan {
421421
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
422422
}
423423

424-
arrow_reader_builder.build().read(self.plan_files().await?)
424+
arrow_reader_builder
425+
.build()
426+
.read(self.plan_files().await?)
427+
.await
425428
}
426429

427430
/// Returns a reference to the column names of the table scan.
@@ -1410,12 +1413,14 @@ mod tests {
14101413
let batch_stream = reader
14111414
.clone()
14121415
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1416+
.await
14131417
.unwrap();
14141418
let batche1: Vec<_> = batch_stream.try_collect().await.unwrap();
14151419

14161420
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
14171421
let batch_stream = reader
14181422
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1423+
.await
14191424
.unwrap();
14201425
let batche2: Vec<_> = batch_stream.try_collect().await.unwrap();
14211426

0 commit comments

Comments
 (0)