Skip to content

Commit 61cf695

Browse files
committed
[minor] use arrow kernel concat_batches instead combine_batches
Signed-off-by: yangjiang <[email protected]>
1 parent dd3f72a commit 61cf695

File tree

3 files changed

+6
-74
lines changed

3 files changed

+6
-74
lines changed

datafusion/core/src/physical_plan/common.rs

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::error::{DataFusionError, Result};
2222
use crate::execution::context::TaskContext;
2323
use crate::physical_plan::metrics::MemTrackingMetrics;
2424
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
25-
use arrow::compute::concat;
2625
use arrow::datatypes::{Schema, SchemaRef};
2726
use arrow::error::ArrowError;
2827
use arrow::error::Result as ArrowResult;
@@ -96,32 +95,6 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
9695
.map_err(DataFusionError::from)
9796
}
9897

99-
/// Combine a slice of record batches into one, or returns None if the slice itself
100-
/// is empty; all the record batches inside the slice must be of the same schema.
101-
pub(crate) fn combine_batches(
102-
batches: &[RecordBatch],
103-
schema: SchemaRef,
104-
) -> ArrowResult<Option<RecordBatch>> {
105-
if batches.is_empty() {
106-
Ok(None)
107-
} else {
108-
let columns = schema
109-
.fields()
110-
.iter()
111-
.enumerate()
112-
.map(|(i, _)| {
113-
concat(
114-
&batches
115-
.iter()
116-
.map(|batch| batch.column(i).as_ref())
117-
.collect::<Vec<_>>(),
118-
)
119-
})
120-
.collect::<ArrowResult<Vec<_>>>()?;
121-
Ok(Some(RecordBatch::try_new(schema.clone(), columns)?))
122-
}
123-
}
124-
12598
/// Recursively builds a list of files in a directory with a given extension
12699
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
127100
let mut filenames: Vec<String> = Vec::new();
@@ -303,46 +276,6 @@ mod tests {
303276
record_batch::RecordBatch,
304277
};
305278

306-
#[test]
307-
fn test_combine_batches_empty() -> Result<()> {
308-
let schema = Arc::new(Schema::new(vec![
309-
Field::new("f32", DataType::Float32, false),
310-
Field::new("f64", DataType::Float64, false),
311-
]));
312-
let result = combine_batches(&[], schema)?;
313-
assert!(result.is_none());
314-
Ok(())
315-
}
316-
317-
#[test]
318-
fn test_combine_batches() -> Result<()> {
319-
let schema = Arc::new(Schema::new(vec![
320-
Field::new("f32", DataType::Float32, false),
321-
Field::new("f64", DataType::Float64, false),
322-
]));
323-
324-
let batch_count = 1000;
325-
let batch_size = 10;
326-
let batches = (0..batch_count)
327-
.map(|i| {
328-
RecordBatch::try_new(
329-
Arc::clone(&schema),
330-
vec![
331-
Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
332-
Arc::new(Float64Array::from_slice(vec![i as f64; batch_size])),
333-
],
334-
)
335-
.unwrap()
336-
})
337-
.collect::<Vec<_>>();
338-
339-
let result = combine_batches(&batches, schema)?;
340-
assert!(result.is_some());
341-
let result = result.unwrap();
342-
assert_eq!(batch_count * batch_size, result.num_rows());
343-
Ok(())
344-
}
345-
346279
#[test]
347280
fn test_compute_record_batch_statistics_empty() -> Result<()> {
348281
let schema = Arc::new(Schema::new(vec![

datafusion/core/src/physical_plan/joins/sort_merge_join.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::sync::Arc;
3030
use std::task::{Context, Poll};
3131

3232
use arrow::array::*;
33-
use arrow::compute::{take, SortOptions};
33+
use arrow::compute::{concat_batches, take, SortOptions};
3434
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
3535
use arrow::error::{ArrowError, Result as ArrowResult};
3636
use arrow::record_batch::RecordBatch;
@@ -40,7 +40,6 @@ use crate::error::DataFusionError;
4040
use crate::error::Result;
4141
use crate::execution::context::TaskContext;
4242
use crate::logical_expr::JoinType;
43-
use crate::physical_plan::common::combine_batches;
4443
use crate::physical_plan::expressions::Column;
4544
use crate::physical_plan::expressions::PhysicalSortExpr;
4645
use crate::physical_plan::joins::utils::{
@@ -1085,8 +1084,7 @@ impl SMJStream {
10851084
}
10861085

10871086
fn output_record_batch_and_reset(&mut self) -> ArrowResult<RecordBatch> {
1088-
let record_batch =
1089-
combine_batches(&self.output_record_batches, self.schema.clone())?.unwrap();
1087+
let record_batch = concat_batches(&self.schema, &self.output_record_batches)?;
10901088
self.join_metrics.output_batches.add(1);
10911089
self.join_metrics.output_rows.add(record_batch.num_rows());
10921090
self.output_size -= record_batch.num_rows();

datafusion/core/src/physical_plan/windows/window_agg_exec.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ use crate::physical_plan::metrics::{
2424
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
2525
};
2626
use crate::physical_plan::{
27-
common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
27+
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
2828
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
2929
SendableRecordBatchStream, Statistics, WindowExpr,
3030
};
31+
use arrow::compute::concat_batches;
3132
use arrow::{
3233
array::ArrayRef,
3334
datatypes::{Schema, SchemaRef},
@@ -274,8 +275,8 @@ impl WindowAggStream {
274275
// record compute time on drop
275276
let _timer = self.baseline_metrics.elapsed_compute().timer();
276277

277-
let batch = common::combine_batches(&self.batches, self.input.schema())?;
278-
if let Some(batch) = batch {
278+
let batch = concat_batches(&self.input.schema(), &self.batches);
279+
if let Ok(batch) = batch {
279280
// calculate window cols
280281
let mut columns = compute_window_aggregates(&self.window_expr, &batch)
281282
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;

0 commit comments

Comments
 (0)