Skip to content

Commit 2b58fd5

Browse files
XiangpengHaoalamb
andauthored
GC StringViewArray in CoalesceBatchesStream (#11587)
* gc string view when appropriate * make clippy happy * address comments * make doc happy * update style * Add comments and tests for gc_string_view_batch * better herustic * update test * Update datafusion/physical-plan/src/coalesce_batches.rs Co-authored-by: Andrew Lamb <[email protected]> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent bb780b3 commit 2b58fd5

File tree

1 file changed

+170
-1
lines changed

1 file changed

+170
-1
lines changed

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ use crate::{
2929
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
3030
};
3131

32+
use arrow::array::{AsArray, StringViewBuilder};
3233
use arrow::datatypes::SchemaRef;
3334
use arrow::error::Result as ArrowResult;
3435
use arrow::record_batch::RecordBatch;
36+
use arrow_array::{Array, ArrayRef};
3537
use datafusion_common::Result;
3638
use datafusion_execution::TaskContext;
3739

@@ -216,6 +218,8 @@ impl CoalesceBatchesStream {
216218
match input_batch {
217219
Poll::Ready(x) => match x {
218220
Some(Ok(batch)) => {
221+
let batch = gc_string_view_batch(&batch);
222+
219223
if batch.num_rows() >= self.target_batch_size
220224
&& self.buffer.is_empty()
221225
{
@@ -290,13 +294,83 @@ pub fn concat_batches(
290294
arrow::compute::concat_batches(schema, batches)
291295
}
292296

297+
/// Heuristically compact [`StringViewArray`]s to reduce memory usage, if needed
298+
///
299+
/// This function decides when to consolidate the StringView into a new buffer
300+
/// to reduce memory usage and improve string locality for better performance.
301+
///
302+
/// This differs from [`StringViewArray::gc`] because:
303+
/// 1. It may not compact the array depending on a heuristic.
304+
/// 2. It uses a larger default block size (2MB) to reduce the number of buffers to track.
305+
///
306+
/// # Heuristic
307+
///
308+
/// If the average size of each view is larger than 32 bytes, we compact the array.
309+
///
310+
/// `StringViewArray` include pointers to buffer that hold the underlying data.
311+
/// One of the great benefits of `StringViewArray` is that many operations
312+
/// (e.g., `filter`) can be done without copying the underlying data.
313+
///
314+
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
315+
/// `StringViewArray` may only refer to a small portion of the buffer,
316+
/// significantly increasing memory usage.
317+
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
318+
let new_columns: Vec<ArrayRef> = batch
319+
.columns()
320+
.iter()
321+
.map(|c| {
322+
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
323+
let Some(s) = c.as_string_view_opt() else {
324+
return Arc::clone(c);
325+
};
326+
let ideal_buffer_size: usize = s
327+
.views()
328+
.iter()
329+
.map(|v| {
330+
let len = (*v as u32) as usize;
331+
if len > 12 {
332+
len
333+
} else {
334+
0
335+
}
336+
})
337+
.sum();
338+
let actual_buffer_size = s.get_buffer_memory_size();
339+
340+
// Re-creating the array copies data and can be time consuming.
341+
// We only do it if the array is sparse
342+
if actual_buffer_size > (ideal_buffer_size * 2) {
343+
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
344+
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
345+
let mut builder = StringViewBuilder::with_capacity(s.len())
346+
.with_block_size(ideal_buffer_size as u32);
347+
348+
for v in s.iter() {
349+
builder.append_option(v);
350+
}
351+
352+
let gc_string = builder.finish();
353+
354+
debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
355+
356+
Arc::new(gc_string)
357+
} else {
358+
Arc::clone(c)
359+
}
360+
})
361+
.collect();
362+
RecordBatch::try_new(batch.schema(), new_columns)
363+
.expect("Failed to re-create the gc'ed record batch")
364+
}
365+
293366
#[cfg(test)]
294367
mod tests {
295368
use super::*;
296369
use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning};
297370

298371
use arrow::datatypes::{DataType, Field, Schema};
299-
use arrow_array::UInt32Array;
372+
use arrow_array::builder::ArrayBuilder;
373+
use arrow_array::{StringViewArray, UInt32Array};
300374

301375
#[tokio::test(flavor = "multi_thread")]
302376
async fn test_concat_batches() -> Result<()> {
@@ -369,4 +443,99 @@ mod tests {
369443
)
370444
.unwrap()
371445
}
446+
447+
#[test]
448+
fn test_gc_string_view_batch_small_no_compact() {
449+
// view with only short strings (no buffers) --> no need to compact
450+
let array = StringViewTest {
451+
rows: 1000,
452+
strings: vec![Some("a"), Some("b"), Some("c")],
453+
}
454+
.build();
455+
456+
let gc_array = do_gc(array.clone());
457+
compare_string_array_values(&array, &gc_array);
458+
assert_eq!(array.data_buffers().len(), 0);
459+
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
460+
}
461+
462+
#[test]
463+
fn test_gc_string_view_batch_large_no_compact() {
464+
// view with large strings (has buffers) but full --> no need to compact
465+
let array = StringViewTest {
466+
rows: 1000,
467+
strings: vec![Some("This string is longer than 12 bytes")],
468+
}
469+
.build();
470+
471+
let gc_array = do_gc(array.clone());
472+
compare_string_array_values(&array, &gc_array);
473+
assert_eq!(array.data_buffers().len(), 5);
474+
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
475+
}
476+
477+
#[test]
478+
fn test_gc_string_view_batch_large_slice_compact() {
479+
// view with large strings (has buffers) and only partially used --> no need to compact
480+
let array = StringViewTest {
481+
rows: 1000,
482+
strings: vec![Some("this string is longer than 12 bytes")],
483+
}
484+
.build();
485+
486+
// slice only 11 rows, so most of the buffer is not used
487+
let array = array.slice(11, 22);
488+
489+
let gc_array = do_gc(array.clone());
490+
compare_string_array_values(&array, &gc_array);
491+
assert_eq!(array.data_buffers().len(), 5);
492+
assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
493+
}
494+
495+
/// Compares the values of two string view arrays
496+
fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) {
497+
assert_eq!(arr1.len(), arr2.len());
498+
for (s1, s2) in arr1.iter().zip(arr2.iter()) {
499+
assert_eq!(s1, s2);
500+
}
501+
}
502+
503+
/// runs garbage collection on string view array
504+
/// and ensures the number of rows are the same
505+
fn do_gc(array: StringViewArray) -> StringViewArray {
506+
let batch =
507+
RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
508+
let gc_batch = gc_string_view_batch(&batch);
509+
assert_eq!(batch.num_rows(), gc_batch.num_rows());
510+
assert_eq!(batch.schema(), gc_batch.schema());
511+
gc_batch
512+
.column(0)
513+
.as_any()
514+
.downcast_ref::<StringViewArray>()
515+
.unwrap()
516+
.clone()
517+
}
518+
519+
/// Describes parameters for creating a `StringViewArray`
520+
struct StringViewTest {
521+
/// The number of rows in the array
522+
rows: usize,
523+
/// The strings to use in the array (repeated over and over
524+
strings: Vec<Option<&'static str>>,
525+
}
526+
527+
impl StringViewTest {
528+
/// Create a `StringViewArray` with the parameters specified in this struct
529+
fn build(self) -> StringViewArray {
530+
let mut builder = StringViewBuilder::with_capacity(100);
531+
loop {
532+
for &v in self.strings.iter() {
533+
builder.append_option(v);
534+
if builder.len() >= self.rows {
535+
return builder.finish();
536+
}
537+
}
538+
}
539+
}
540+
}
372541
}

0 commit comments

Comments
 (0)