From 26e3d2e11189a902b2673c95623ce1561de44856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 4 May 2025 10:31:44 +0200 Subject: [PATCH 1/6] Speedup filter_bytes --- arrow-select/src/filter.rs | 43 +++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 7bb140d37f51..7070dc4259f2 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -556,14 +556,14 @@ fn filter_native(values: &[T], predicate: &FilterPredicate) let buffer = match &predicate.strategy { IterationStrategy::SlicesIterator => { - let mut buffer = MutableBuffer::with_capacity(predicate.count * T::get_byte_width()); + let mut buffer = Vec::with_capacity(predicate.count); for (start, end) in SlicesIterator::new(&predicate.filter) { buffer.extend_from_slice(&values[start..end]); } buffer } IterationStrategy::Slices(slices) => { - let mut buffer = MutableBuffer::with_capacity(predicate.count * T::get_byte_width()); + let mut buffer = Vec::with_capacity(predicate.count); for (start, end) in slices { buffer.extend_from_slice(&values[*start..*end]); } @@ -572,13 +572,11 @@ fn filter_native(values: &[T], predicate: &FilterPredicate) IterationStrategy::IndexIterator => { let iter = IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]); - // SAFETY: IndexIterator is trusted length - unsafe { MutableBuffer::from_trusted_len_iter(iter) } + iter.collect() } IterationStrategy::Indices(indices) => { let iter = indices.iter().map(|x| values[*x]); - // SAFETY: `Vec::iter` is trusted length - unsafe { MutableBuffer::from_trusted_len_iter(iter) } + iter.collect() } IterationStrategy::All | IterationStrategy::None => unreachable!(), }; @@ -656,6 +654,16 @@ where (start, end, len) } + fn set_capacity_idx(&mut self, iter: impl Iterator) { + let mut capacity = 0; + for idx in iter { + let start = self.src_offsets[idx].as_usize(); + let end = self.src_offsets[idx + 1].as_usize(); + capacity += end - start; + } + self.dst_values.reserve_exact(capacity); + } + /// Extends the in-progress array by the indexes in the provided iterator fn extend_idx(&mut self, iter: impl Iterator) { self.dst_offsets.extend(iter.map(|idx| { @@ -669,6 +677,16 @@ where })); } + fn set_capacity_slices(&mut self, iter: impl Iterator) { + let mut capacity = 0; + for (start, end) in iter { + let value_start = self.get_value_offset(start); + let value_end = self.get_value_offset(end); + capacity += value_end - value_start; + } + self.dst_values.reserve_exact(capacity); + } + /// Extends the in-progress array by the ranges in the provided iterator fn extend_slices(&mut self, iter: impl Iterator) { for (start, end) in iter { @@ -699,13 +717,22 @@ where match &predicate.strategy { IterationStrategy::SlicesIterator => { + filter.set_capacity_slices(SlicesIterator::new(&predicate.filter)); filter.extend_slices(SlicesIterator::new(&predicate.filter)) } - IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()), + IterationStrategy::Slices(slices) => { + filter.set_capacity_slices(slices.iter().cloned()); + filter.extend_slices(slices.iter().cloned()) + } IterationStrategy::IndexIterator => { + filter.set_capacity_idx(IndexIterator::new(&predicate.filter, predicate.count)); filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) } - IterationStrategy::Indices(indices) => filter.extend_idx(indices.iter().cloned()), + IterationStrategy::Indices(indices) => { + filter.set_capacity_idx(IndexIterator::new(&predicate.filter, predicate.count)); + + filter.extend_idx(indices.iter().cloned()) + } IterationStrategy::All | IterationStrategy::None => unreachable!(), } From 37f02b59bbcaf0e76e78c6344a33d29a09659a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 4 May 2025 11:22:06 +0200 Subject: [PATCH 2/6] Speedup filter_bytes --- arrow-select/src/filter.rs | 65 +++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 7070dc4259f2..3105b63da4da 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -615,6 +615,25 @@ struct FilterBytes<'a, OffsetSize> { cur_offset: OffsetSize, } +/// abc +pub trait PushUnchecked { + /// Will push an item and not check if there is enough capacity + /// + /// # Safety + /// Caller must ensure the array has enough capacity to hold `T`. + unsafe fn push_unchecked(&mut self, value: T); +} + +impl PushUnchecked for Vec { + #[inline] + unsafe fn push_unchecked(&mut self, value: T) { + debug_assert!(self.capacity() > self.len()); + let end = self.as_mut_ptr().add(self.len()); + std::ptr::write(end, value); + self.set_len(self.len() + 1); + } +} + impl<'a, OffsetSize> FilterBytes<'a, OffsetSize> where OffsetSize: OffsetSizeTrait, @@ -655,48 +674,43 @@ where } fn set_capacity_idx(&mut self, iter: impl Iterator) { - let mut capacity = 0; - for idx in iter { + self.dst_offsets.extend(iter.map(|idx| { let start = self.src_offsets[idx].as_usize(); let end = self.src_offsets[idx + 1].as_usize(); - capacity += end - start; - } - self.dst_values.reserve_exact(capacity); + let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); + self.cur_offset += len; + + self.cur_offset + })); + self.dst_values.reserve_exact(self.cur_offset.as_usize()); } /// Extends the in-progress array by the indexes in the provided iterator fn extend_idx(&mut self, iter: impl Iterator) { - self.dst_offsets.extend(iter.map(|idx| { + for idx in iter { let start = self.src_offsets[idx].as_usize(); let end = self.src_offsets[idx + 1].as_usize(); - let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); - self.cur_offset += len; self.dst_values .extend_from_slice(&self.src_values[start..end]); - self.cur_offset - })); - } - - fn set_capacity_slices(&mut self, iter: impl Iterator) { - let mut capacity = 0; - for (start, end) in iter { - let value_start = self.get_value_offset(start); - let value_end = self.get_value_offset(end); - capacity += value_end - value_start; } - self.dst_values.reserve_exact(capacity); } - /// Extends the in-progress array by the ranges in the provided iterator - fn extend_slices(&mut self, iter: impl Iterator) { + fn set_capacity_slices(&mut self, iter: impl Iterator, count: usize) { + self.dst_offsets.reserve_exact(count); for (start, end) in iter { // These can only fail if `array` contains invalid data for idx in start..end { let (_, _, len) = self.get_value_range(idx); self.cur_offset += len; - self.dst_offsets.push(self.cur_offset); // push_unchecked? + self.dst_offsets.push(self.cur_offset); } + } + self.dst_values.reserve_exact(self.cur_offset.as_usize()); + } + /// Extends the in-progress array by the ranges in the provided iterator + fn extend_slices(&mut self, iter: impl Iterator) { + for (start, end) in iter { let value_start = self.get_value_offset(start); let value_end = self.get_value_offset(end); self.dst_values @@ -717,11 +731,11 @@ where match &predicate.strategy { IterationStrategy::SlicesIterator => { - filter.set_capacity_slices(SlicesIterator::new(&predicate.filter)); + filter.set_capacity_slices(SlicesIterator::new(&predicate.filter), predicate.count); filter.extend_slices(SlicesIterator::new(&predicate.filter)) } IterationStrategy::Slices(slices) => { - filter.set_capacity_slices(slices.iter().cloned()); + filter.set_capacity_slices(slices.iter().cloned(), predicate.count); filter.extend_slices(slices.iter().cloned()) } IterationStrategy::IndexIterator => { @@ -729,8 +743,7 @@ where filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) } IterationStrategy::Indices(indices) => { - filter.set_capacity_idx(IndexIterator::new(&predicate.filter, predicate.count)); - + filter.set_capacity_idx(indices.iter().cloned()); filter.extend_idx(indices.iter().cloned()) } IterationStrategy::All | IterationStrategy::None => unreachable!(), From 143d7bf8f961c71ce237577891adf873e26f3761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 4 May 2025 14:20:33 +0200 Subject: [PATCH 3/6] Cleanup --- arrow-select/src/filter.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 3105b63da4da..11eed12a4994 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -615,25 +615,6 @@ struct FilterBytes<'a, OffsetSize> { cur_offset: OffsetSize, } -/// abc -pub trait PushUnchecked { - /// Will push an item and not check if there is enough capacity - /// - /// # Safety - /// Caller must ensure the array has enough capacity to hold `T`. - unsafe fn push_unchecked(&mut self, value: T); -} - -impl PushUnchecked for Vec { - #[inline] - unsafe fn push_unchecked(&mut self, value: T) { - debug_assert!(self.capacity() > self.len()); - let end = self.as_mut_ptr().add(self.len()); - std::ptr::write(end, value); - self.set_len(self.len() + 1); - } -} - impl<'a, OffsetSize> FilterBytes<'a, OffsetSize> where OffsetSize: OffsetSizeTrait, From 2955b699b7e07a28880c2e428fefa5fe77054b71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 4 May 2025 14:28:40 +0200 Subject: [PATCH 4/6] Cleanup --- arrow-select/src/filter.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 11eed12a4994..657596ba7b9c 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -654,7 +654,7 @@ where (start, end, len) } - fn set_capacity_idx(&mut self, iter: impl Iterator) { + fn extend_offsets_idx(&mut self, iter: impl Iterator) { self.dst_offsets.extend(iter.map(|idx| { let start = self.src_offsets[idx].as_usize(); let end = self.src_offsets[idx + 1].as_usize(); @@ -676,7 +676,7 @@ where } } - fn set_capacity_slices(&mut self, iter: impl Iterator, count: usize) { + fn extend_offsets_slices(&mut self, iter: impl Iterator, count: usize) { self.dst_offsets.reserve_exact(count); for (start, end) in iter { // These can only fail if `array` contains invalid data @@ -712,19 +712,19 @@ where match &predicate.strategy { IterationStrategy::SlicesIterator => { - filter.set_capacity_slices(SlicesIterator::new(&predicate.filter), predicate.count); + filter.extend_offsets_slices(SlicesIterator::new(&predicate.filter), predicate.count); filter.extend_slices(SlicesIterator::new(&predicate.filter)) } IterationStrategy::Slices(slices) => { - filter.set_capacity_slices(slices.iter().cloned(), predicate.count); + filter.extend_offsets_slices(slices.iter().cloned(), predicate.count); filter.extend_slices(slices.iter().cloned()) } IterationStrategy::IndexIterator => { - filter.set_capacity_idx(IndexIterator::new(&predicate.filter, predicate.count)); + filter.extend_offsets_idx(IndexIterator::new(&predicate.filter, predicate.count)); filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) } IterationStrategy::Indices(indices) => { - filter.set_capacity_idx(indices.iter().cloned()); + filter.extend_offsets_idx(indices.iter().cloned()); filter.extend_idx(indices.iter().cloned()) } IterationStrategy::All | IterationStrategy::None => unreachable!(), From 816bab22546bebe865f1be855f6d239673f365ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 4 May 2025 15:45:54 +0200 Subject: [PATCH 5/6] WIP --- arrow-select/src/filter.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 657596ba7b9c..9da766273d29 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -554,34 +554,33 @@ fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanA fn filter_native(values: &[T], predicate: &FilterPredicate) -> Buffer { assert!(values.len() >= predicate.filter.len()); - let buffer = match &predicate.strategy { + match &predicate.strategy { IterationStrategy::SlicesIterator => { let mut buffer = Vec::with_capacity(predicate.count); for (start, end) in SlicesIterator::new(&predicate.filter) { buffer.extend_from_slice(&values[start..end]); } - buffer + buffer.into() } IterationStrategy::Slices(slices) => { let mut buffer = Vec::with_capacity(predicate.count); for (start, end) in slices { buffer.extend_from_slice(&values[*start..*end]); } - buffer + buffer.into() } IterationStrategy::IndexIterator => { let iter = IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]); - iter.collect() + // SAFETY: IndexIterator is trusted length + unsafe { MutableBuffer::from_trusted_len_iter(iter) }.into() } IterationStrategy::Indices(indices) => { let iter = indices.iter().map(|x| values[*x]); - iter.collect() + iter.collect::>().into() } IterationStrategy::All | IterationStrategy::None => unreachable!(), - }; - - buffer.into() + } } /// `filter` implementation for primitive arrays From 469441e81c12af4ec2ebe87ab98bdb2bb0510c3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 5 May 2025 17:58:02 +0200 Subject: [PATCH 6/6] Move allocation --- arrow-select/src/filter.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 9da766273d29..ad205dfea31c 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -662,11 +662,12 @@ where self.cur_offset })); - self.dst_values.reserve_exact(self.cur_offset.as_usize()); } /// Extends the in-progress array by the indexes in the provided iterator fn extend_idx(&mut self, iter: impl Iterator) { + self.dst_values.reserve_exact(self.cur_offset.as_usize()); + for idx in iter { let start = self.src_offsets[idx].as_usize(); let end = self.src_offsets[idx + 1].as_usize(); @@ -685,11 +686,12 @@ where self.dst_offsets.push(self.cur_offset); } } - self.dst_values.reserve_exact(self.cur_offset.as_usize()); } /// Extends the in-progress array by the ranges in the provided iterator fn extend_slices(&mut self, iter: impl Iterator) { + self.dst_values.reserve_exact(self.cur_offset.as_usize()); + for (start, end) in iter { let value_start = self.get_value_offset(start); let value_end = self.get_value_offset(end);