diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 82e7aafcee2b1..e4ba71e45c661 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -57,6 +57,10 @@ sql = ["sqlparser"] harness = false name = "with_hashes" +[[bench]] +harness = false +name = "scalar_to_array" + [dependencies] ahash = { workspace = true } apache-avro = { workspace = true, features = [ diff --git a/datafusion/common/benches/scalar_to_array.rs b/datafusion/common/benches/scalar_to_array.rs new file mode 100644 index 0000000000000..90a152e515fe5 --- /dev/null +++ b/datafusion/common/benches/scalar_to_array.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for `ScalarValue::to_array_of_size`, focusing on List +//! scalars. + +use arrow::array::{Array, ArrayRef, AsArray, StringViewBuilder}; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_common::ScalarValue; +use datafusion_common::utils::SingleRowListArrayBuilder; +use std::sync::Arc; + +/// Build a `ScalarValue::List` of `num_elements` Utf8View strings whose +/// inner StringViewArray has `num_buffers` data buffers. +fn make_list_scalar(num_elements: usize, num_buffers: usize) -> ScalarValue { + let elements_per_buffer = num_elements.div_ceil(num_buffers); + + let mut small_arrays: Vec = Vec::new(); + let mut remaining = num_elements; + for buf_idx in 0..num_buffers { + let count = remaining.min(elements_per_buffer); + if count == 0 { + break; + } + let start = buf_idx * elements_per_buffer; + let mut builder = StringViewBuilder::with_capacity(count); + for i in start..start + count { + builder.append_value(format!("{i:024x}")); + } + small_arrays.push(Arc::new(builder.finish()) as ArrayRef); + remaining -= count; + } + + let refs: Vec<&dyn Array> = small_arrays.iter().map(|a| a.as_ref()).collect(); + let concated = arrow::compute::concat(&refs).unwrap(); + + let list_array = SingleRowListArrayBuilder::new(concated) + .with_field(&Field::new_list_field(DataType::Utf8View, true)) + .build_list_array(); + ScalarValue::List(Arc::new(list_array)) +} + +/// We want to measure the cost of doing the conversion and then also accessing +/// the results, to model what would happen during query evaluation. +fn consume_list_array(arr: &ArrayRef) { + let list_arr = arr.as_list::(); + let mut total_len: usize = 0; + for i in 0..list_arr.len() { + let inner = list_arr.value(i); + let sv = inner.as_string_view(); + for j in 0..sv.len() { + total_len += sv.value(j).len(); + } + } + std::hint::black_box(total_len); +} + +fn bench_list_to_array_of_size(c: &mut Criterion) { + let mut group = c.benchmark_group("list_to_array_of_size"); + + let num_elements = 1245; + let scalar_1buf = make_list_scalar(num_elements, 1); + let scalar_50buf = make_list_scalar(num_elements, 50); + + for batch_size in [256, 1024] { + group.bench_with_input( + BenchmarkId::new("1_buffer", batch_size), + &batch_size, + |b, &sz| { + b.iter(|| { + let arr = scalar_1buf.to_array_of_size(sz).unwrap(); + consume_list_array(&arr); + }); + }, + ); + group.bench_with_input( + BenchmarkId::new("50_buffers", batch_size), + &batch_size, + |b, &sz| { + b.iter(|| { + let arr = scalar_50buf.to_array_of_size(sz).unwrap(); + consume_list_array(&arr); + }); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_list_to_array_of_size); +criterion_main!(benches); diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 644916d7891c4..23aba2255f638 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3008,7 +3008,7 @@ impl ScalarValue { /// /// Errors if `self` is /// - a decimal that fails be converted to a decimal array of size - /// - a `FixedsizeList` that fails to be concatenated into an array of size + /// - a `FixedSizeList` that fails to be concatenated into an array of size /// - a `List` that fails to be concatenated into an array of size /// - a `Dictionary` that fails be converted to a dictionary array of size pub fn to_array_of_size(&self, size: usize) -> Result { @@ -3435,12 +3435,13 @@ impl ScalarValue { } fn list_to_array_of_size(arr: &dyn Array, size: usize) -> Result { - let arrays = repeat_n(arr, size).collect::>(); - let ret = match !arrays.is_empty() { - true => arrow::compute::concat(arrays.as_slice())?, - false => arr.slice(0, 0), - }; - Ok(ret) + if size == 0 { + return Ok(arr.slice(0, 0)); + } + + let n = arr.len() as u32; + let indices = UInt32Array::from_iter_values((0..size).flat_map(|_| 0..n)); + Ok(arrow::compute::take(arr, &indices, None)?) } /// Retrieve ScalarValue for each row in `array` @@ -5532,6 +5533,79 @@ mod tests { assert_eq!(empty_array.len(), 0); } + #[test] + fn test_to_array_of_size_list_size_one() { + // size=1 takes the fast path (Arc::clone) + let arr = ListArray::from_iter_primitive::(vec![Some(vec![ + Some(10), + Some(20), + ])]); + let sv = ScalarValue::List(Arc::new(arr.clone())); + let result = sv.to_array_of_size(1).unwrap(); + assert_eq!(result.as_list::(), &arr); + } + + #[test] + fn test_to_array_of_size_list_empty_inner() { + // A list scalar containing an empty list: [[]] + let arr = ListArray::from_iter_primitive::(vec![Some(vec![])]); + let sv = ScalarValue::List(Arc::new(arr)); + let result = sv.to_array_of_size(3).unwrap(); + let result_list = result.as_list::(); + assert_eq!(result_list.len(), 3); + for i in 0..3 { + assert_eq!(result_list.value(i).len(), 0); + } + } + + #[test] + fn test_to_array_of_size_large_list() { + let arr = + LargeListArray::from_iter_primitive::(vec![Some(vec![ + Some(100), + Some(200), + ])]); + let sv = ScalarValue::LargeList(Arc::new(arr)); + let result = sv.to_array_of_size(3).unwrap(); + let expected = LargeListArray::from_iter_primitive::(vec![ + Some(vec![Some(100), Some(200)]), + Some(vec![Some(100), Some(200)]), + Some(vec![Some(100), Some(200)]), + ]); + assert_eq!(result.as_list::(), &expected); + } + + #[test] + fn test_list_to_array_of_size_multi_row() { + // Call list_to_array_of_size directly with arr.len() > 1 + let arr = Int32Array::from(vec![Some(10), None, Some(30)]); + let result = ScalarValue::list_to_array_of_size(&arr, 3).unwrap(); + let result = result.as_primitive::(); + assert_eq!( + result.iter().collect::>(), + vec![ + Some(10), + None, + Some(30), + Some(10), + None, + Some(30), + Some(10), + None, + Some(30), + ] + ); + } + + #[test] + fn test_to_array_of_size_null_list() { + let dt = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))); + let sv = ScalarValue::try_from(&dt).unwrap(); + let result = sv.to_array_of_size(3).unwrap(); + assert_eq!(result.len(), 3); + assert_eq!(result.null_count(), 3); + } + /// See https://github.com/apache/datafusion/issues/18870 #[test] fn test_to_array_of_size_for_none_fsb() {