diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 523f7dbdd1..53156145da 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -161,16 +161,6 @@ pub enum Error { type_name: String, backtrace: Backtrace, }, - - #[snafu(display( - "Hybrid format doesn't support variable length type, type:{}.\nBacktrace:\n{}", - type_name, - backtrace - ))] - VariableLengthType { - type_name: String, - backtrace: Backtrace, - }, } define_result!(Error); @@ -355,14 +345,6 @@ impl HybridRecordEncoder { } if schema.is_collapsible_column(idx) { - // TODO: support variable length type - ensure!( - col.data_type.size().is_some(), - VariableLengthType { - type_name: col.data_type.to_string(), - } - ); - collapsible_col_types.push(IndexedType { idx, data_type: schema.column(idx).data_type, @@ -788,6 +770,12 @@ mod tests { .unwrap(), ) .unwrap() + .add_normal_column( + column_schema::Builder::new("string_value".to_string(), DatumKind::String) + .build() + .unwrap(), + ) + .unwrap() .build() .unwrap() } @@ -882,7 +870,7 @@ mod tests { } #[test] - fn encode_hybrid_record_and_decode_back() { + fn hybrid_record_encode_and_decode() { let write_props = WriterProperties::builder().build(); let schema = build_schema(); let mut encoder = HybridRecordEncoder::try_new(write_props, &schema).unwrap(); @@ -903,6 +891,12 @@ mod tests { Some("region2"), ]), int32_array(vec![Some(1), Some(2), Some(11), Some(12)]), + string_array(vec![ + Some("string_value1"), + Some("string_value2"), + Some("string_value3"), + Some("string_value4"), + ]), ]; let input_record_batch = diff --git a/analytic_engine/src/sst/parquet/hybrid.rs b/analytic_engine/src/sst/parquet/hybrid.rs index 19a75eeea6..544050139d 100644 --- a/analytic_engine/src/sst/parquet/hybrid.rs +++ b/analytic_engine/src/sst/parquet/hybrid.rs @@ -3,9 +3,12 @@ use std::{collections::BTreeMap, sync::Arc}; use arrow_deps::arrow::{ - array::{Array, ArrayData, ArrayRef, ListArray, StringArray, UInt64Array}, + array::{ + Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, ListArray, StringArray, + UInt64Array, + }, bitmap::Bitmap, - buffer::MutableBuffer, + buffer::{Buffer, MutableBuffer}, datatypes::Schema as ArrowSchema, record_batch::RecordBatch as ArrowRecordBatch, util::bit_util, @@ -14,14 +17,26 @@ use common_types::{ datum::DatumKind, schema::{ArrowSchemaRef, DataType, Field, Schema}, }; -use log::debug; -use snafu::ResultExt; +use snafu::{Backtrace, ResultExt, Snafu}; use crate::sst::builder::{EncodeRecordBatch, Result}; // hard coded in https://github.com/apache/arrow-rs/blob/20.0.0/arrow/src/array/array_list.rs#L185 const LIST_ITEM_NAME: &str = "item"; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display( + "Hybrid format only support variable length types UTF8 and Binary, current type:{:?}.\nBacktrace:\n{}", + type_name, + backtrace + ))] + VariableLengthType { + type_name: DataType, + backtrace: Backtrace, + }, +} + #[derive(Debug, Clone, Copy)] struct SliceArg { offset: usize, @@ -34,7 +49,7 @@ struct SliceArg { /// Note: /// 1. Array.slice(offset, length) don't work as expected, since the /// underlying buffer is still shared without slice. -/// 2. Array shoule be [fixed-size primitive](https://arrow.apache.org/docs/format/Columnar.html#fixed-size-primitive-layout) +/// 2. Array should be [fixed-size primitive](https://arrow.apache.org/docs/format/Columnar.html#fixed-size-primitive-layout) #[derive(Debug, Clone)] struct ArrayHandle { array: ArrayRef, @@ -130,6 +145,44 @@ pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef { )) } +struct StringArrayWrapper<'a>(&'a StringArray); +struct BinaryArrayWrapper<'a>(&'a BinaryArray); + +/// VariableSizeArray is a trait of variable-size array, such as StringArray and +/// BinaryArray. Dealing with the buffer data. +/// +/// There is no common trait for variable-size array to dealing with the buffer +/// data in arrow-rs library. +trait VariableSizeArray { + // Returns the offset values in the offsets buffer. + fn value_offsets(&self) -> &[i32]; + // Returns the length for the element at index i. + fn value_length(&self, index: usize) -> i32; + // Returns a clone of the value data buffer. + fn value_data(&self) -> Buffer; +} + +macro_rules! impl_offsets { + ($array: ty) => { + impl<'a> VariableSizeArray for $array { + fn value_offsets(&self) -> &[i32] { + self.0.value_offsets() + } + + fn value_length(&self, index: usize) -> i32 { + self.0.value_length(index) + } + + fn value_data(&self) -> Buffer { + self.0.value_data() + } + } + }; +} + +impl_offsets!(StringArrayWrapper<'a>); +impl_offsets!(BinaryArrayWrapper<'a>); + /// ListArrayBuilder is used for concat slice of different Arrays represented by /// ArrayHandle into one ListArray struct ListArrayBuilder { @@ -146,12 +199,8 @@ impl ListArrayBuilder { } fn build_child_data(&self, offsets: &mut MutableBuffer) -> Result { - let data_type_size = self - .datum_kind - .size() - .expect("checked in HybridRecordEncoder::try_new"); let values_num = self.list_of_arrays.iter().map(|handle| handle.len()).sum(); - let mut values = MutableBuffer::new(values_num * data_type_size); + // Initialize null_buffer with all 1, so we don't need to set it when array's // null_bitmap is None // @@ -160,9 +209,7 @@ impl ListArrayBuilder { let null_slice = null_buffer.as_slice_mut(); let mut length_so_far: i32 = 0; - offsets.push(length_so_far); for array_handle in &self.list_of_arrays { - let shared_buffer = array_handle.data_slice(); let null_bitmap = array_handle.null_bitmap(); for slice_arg in &array_handle.slice_args { @@ -179,23 +226,15 @@ impl ListArrayBuilder { } } length_so_far += length as i32; - values.extend_from_slice( - &shared_buffer[offset * data_type_size..(offset + length) * data_type_size], - ); } - offsets.push(length_so_far); } - debug!( - "build_child_data offsets:{:?}, values:{:?}", - offsets.as_slice(), - values.as_slice() - ); - - let values_array_data = ArrayData::builder(self.datum_kind.to_arrow_data_type()) + let mut builder = ArrayData::builder(self.datum_kind.to_arrow_data_type()) .len(values_num) - .add_buffer(values.into()) - .null_bit_buffer(Some(null_buffer.into())) + .null_bit_buffer(Some(null_buffer.into())); + + builder = self.apply_child_data_buffer(builder, offsets)?; + let values_array_data = builder .build() .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)?; @@ -203,6 +242,162 @@ impl ListArrayBuilder { Ok(values_array_data) } + fn apply_child_data_buffer( + &self, + mut builder: ArrayDataBuilder, + offsets: &mut MutableBuffer, + ) -> Result { + let (inner_offsets, values) = if let Some(data_type_size) = self.datum_kind.size() { + ( + None, + self.build_fixed_size_array_buffer(offsets, data_type_size), + ) + } else { + let (inner_offsets, values) = self.build_variable_size_array_buffer(offsets)?; + (Some(inner_offsets), values) + }; + + if let Some(buffer) = inner_offsets { + builder = builder.add_buffer(buffer.into()); + } + builder = builder.add_buffer(values.into()); + Ok(builder) + } + + fn build_fixed_size_array_buffer( + &self, + offsets: &mut MutableBuffer, + data_type_size: usize, + ) -> MutableBuffer { + let mut length_so_far: i32 = 0; + offsets.push(length_so_far); + + let values_num: usize = self.list_of_arrays.iter().map(|handle| handle.len()).sum(); + let mut values = MutableBuffer::new(values_num * data_type_size); + + for array_handle in &self.list_of_arrays { + let shared_buffer = array_handle.data_slice(); + + for slice_arg in &array_handle.slice_args { + let offset = slice_arg.offset; + let length = slice_arg.length; + length_so_far += length as i32; + + values.extend_from_slice( + &shared_buffer[offset * data_type_size..(offset + length) * data_type_size], + ); + } + offsets.push(length_so_far); + } + values + } + + /// Return (offsets_buffer, values_buffer) according to arrow + /// `Variable-size Binary Layout`. Refer to https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-layout. + fn build_variable_size_array_buffer( + &self, + offsets: &mut MutableBuffer, + ) -> Result<(MutableBuffer, MutableBuffer)> { + let mut length_so_far: i32 = 0; + offsets.push(length_so_far); + + let (offsets_length_total, values_length_total) = + self.compute_variable_size_array_buffer_length()?; + + let mut inner_values = MutableBuffer::new(values_length_total as usize); + let mut inner_offsets = MutableBuffer::new(offsets_length_total); + + self.build_variable_size_array_buffer_data( + &mut length_so_far, + offsets, + &mut inner_offsets, + &mut inner_values, + )?; + Ok((inner_offsets, inner_values)) + } + + fn convert_to_variable_size_array<'a>( + &self, + array_handle: &'a ArrayHandle, + ) -> Result> { + match self.datum_kind.to_arrow_data_type() { + DataType::Utf8 => Ok(Box::new(StringArrayWrapper( + array_handle + .array + .as_any() + .downcast_ref::() + .expect("downcast StringArray failed"), + ))), + DataType::Binary => Ok(Box::new(BinaryArrayWrapper( + array_handle + .array + .as_any() + .downcast_ref::() + .expect("downcast BinaryArray failed"), + ))), + typ => VariableLengthType { type_name: typ } + .fail() + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch), + } + } + + /// Return (offsets_length_total, values_length_total). + #[inline] + fn compute_variable_size_array_buffer_length(&self) -> Result<(usize, usize)> { + let mut offsets_length_total = 0; + let mut values_length_total = 0; + + for array_handle in &self.list_of_arrays { + let array = self.convert_to_variable_size_array(array_handle)?; + for slice_arg in &array_handle.slice_args { + let start = array.value_offsets()[slice_arg.offset]; + let end = array.value_offsets()[slice_arg.offset + slice_arg.length]; + + offsets_length_total += slice_arg.length; + values_length_total += (end - start) as usize; + } + } + Ok((offsets_length_total, values_length_total)) + } + + /// Build variable-size array buffer data. + /// + /// length_so_far and offsets are used for father buffer data. + /// inner_offsets and inner_values are used for child buffer data. + fn build_variable_size_array_buffer_data( + &self, + length_so_far: &mut i32, + offsets: &mut MutableBuffer, + inner_offsets: &mut MutableBuffer, + inner_values: &mut MutableBuffer, + ) -> Result<()> { + let mut inner_length_so_far: i32 = 0; + inner_offsets.push(inner_length_so_far); + + for array_handle in &self.list_of_arrays { + let array = self.convert_to_variable_size_array(array_handle)?; + for slice_arg in &array_handle.slice_args { + *length_so_far += slice_arg.length as i32; + + let start = array.value_offsets()[slice_arg.offset]; + let end = array.value_offsets()[slice_arg.offset + slice_arg.length]; + + for i in (slice_arg.offset as usize)..(slice_arg.offset + slice_arg.length as usize) + { + inner_length_so_far += array.value_length(i); + inner_offsets.push(inner_length_so_far); + } + + inner_values.extend_from_slice( + &array.value_data().as_slice()[start as usize..end as usize], + ); + } + offsets.push(*length_so_far); + } + Ok(()) + } + /// This function is a translation of [GenericListArray.from_iter_primitive](https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#151) fn build(self) -> Result { let array_len = self.list_of_arrays.len(); @@ -232,7 +427,7 @@ impl ListArrayBuilder { } /// Builds hybrid record by concat timestamp and non key columns into -/// `ListArray` +/// `ListArray`. fn build_hybrid_record( arrow_schema: ArrowSchemaRef, tsid_type: &IndexedType, @@ -372,6 +567,7 @@ pub fn convert_to_hybrid_record( mod tests { use arrow_deps::arrow::{ array::{TimestampMillisecondArray, UInt16Array}, + buffer::Buffer, datatypes::{TimestampMillisecondType, UInt16Type}, }; @@ -396,6 +592,12 @@ mod tests { Arc::new(arr) } + fn string_array(values: Vec>) -> ArrayRef { + let arr: StringArray = values.into_iter().collect(); + + Arc::new(arr) + } + #[test] fn merge_timestamp_array_to_list() { let list_of_arrays = vec![ @@ -423,20 +625,37 @@ mod tests { #[test] fn merge_u16_array_with_none_to_list() { - let list_of_arrays = vec![ArrayHandle::with_slice_args( - uint16_array(vec![ - Some(1), - Some(2), - None, - Some(3), - Some(4), - Some(5), - Some(6), - ]), - vec![(1, 3).into(), (4, 1).into()], - )]; - - let data = vec![Some(vec![Some(2), None, Some(3), Some(4)])]; + let list_of_arrays = vec![ + ArrayHandle::with_slice_args( + uint16_array(vec![ + Some(1), + Some(2), + None, + Some(3), + Some(4), + Some(5), + Some(6), + ]), + vec![(1, 3).into(), (4, 1).into()], + ), + ArrayHandle::with_slice_args( + uint16_array(vec![ + Some(1), + Some(2), + None, + Some(3), + Some(4), + Some(5), + Some(6), + ]), + vec![(0, 1).into()], + ), + ]; + + let data = vec![ + Some(vec![Some(2), None, Some(3), Some(4)]), + Some(vec![Some(1)]), + ]; let expected = ListArray::from_iter_primitive::(data); let list_array = ListArrayBuilder::new(DatumKind::UInt16, list_of_arrays) .build() @@ -444,4 +663,53 @@ mod tests { assert_eq!(list_array, expected); } + + #[test] + fn merge_string_array_with_none_to_list() { + let list_of_arrays = vec![ + ArrayHandle::with_slice_args( + string_array(vec![ + Some("a"), + Some("bb"), + None, + Some("ccc"), + Some("d"), + Some("eeee"), + Some("eee"), + ]), + vec![(1, 3).into(), (4, 1).into()], + ), + ArrayHandle::with_slice_args( + string_array(vec![ + Some("a"), + Some("bb"), + None, + Some("ccc"), + Some("d"), + Some("eeee"), + Some("eee"), + ]), + vec![(0, 1).into()], + ), + ]; + + let string_data = string_array(vec![Some("bb"), None, Some("ccc"), Some("d"), Some("a")]); + let offsets: [i32; 3] = [0, 4, 5]; + let array_data = ArrayData::builder(DataType::List(Box::new(Field::new( + LIST_ITEM_NAME, + DataType::Utf8, + true, + )))) + .len(2) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_child_data(string_data.data().to_owned()) + .build() + .unwrap(); + let expected = ListArray::from(array_data); + let list_array = ListArrayBuilder::new(DatumKind::String, list_of_arrays) + .build() + .unwrap(); + + assert_eq!(list_array, expected); + } }