diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c28e56790e66..9157ab609777 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -985,7 +985,7 @@ impl DefaultPhysicalPlanner { struct_type_columns.clone(), schema, options.clone(), - )) + )?) } // 2 Children diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index e36cd2b6c242..026a7fbcd0e5 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -32,8 +32,8 @@ use crate::{ }; use arrow::array::{ - new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array, - LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, + new_null_array, Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, + Int64Array, LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, }; use arrow::compute::kernels::length::length; use arrow::compute::kernels::zip::zip; @@ -43,16 +43,19 @@ use arrow::record_batch::RecordBatch; use arrow_ord::cmp::lt; use async_trait::async_trait; use datafusion_common::{ - exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions, + exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result, + UnnestOptions, }; use datafusion_execution::TaskContext; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::equivalence::ProjectionMapping; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::PhysicalExpr; use futures::{Stream, StreamExt}; use log::trace; /// Unnest the given columns (either with type struct or list) -/// For list unnesting, each rows is vertically transformed into multiple rows -/// For struct unnesting, each columns is horizontally transformed into multiple columns, +/// For list unnesting, each row is vertically transformed into multiple rows +/// For struct unnesting, each column is horizontally transformed into multiple columns, /// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m') /// /// See [`UnnestOptions`] for more details and an example. @@ -82,10 +85,15 @@ impl UnnestExec { struct_column_indices: Vec, schema: SchemaRef, options: UnnestOptions, - ) -> Self { - let cache = Self::compute_properties(&input, Arc::clone(&schema)); + ) -> Result { + let cache = Self::compute_properties( + &input, + &list_column_indices, + &struct_column_indices, + Arc::clone(&schema), + )?; - UnnestExec { + Ok(UnnestExec { input, schema, list_column_indices, @@ -93,20 +101,78 @@ impl UnnestExec { options, metrics: Default::default(), cache, - } + }) } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, + list_column_indices: &[ListUnnest], + struct_column_indices: &[usize], schema: SchemaRef, - ) -> PlanProperties { - PlanProperties::new( - EquivalenceProperties::new(schema), - input.output_partitioning().to_owned(), + ) -> Result { + // Find out which indices are not unnested, such that they can be copied over from the input plan + let input_schema = input.schema(); + let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len()); + unnested_indices.append_n(input_schema.fields().len(), false); + for list_unnest in list_column_indices { + unnested_indices.set_bit(list_unnest.index_in_input_schema, true); + } + for struct_unnest in struct_column_indices { + unnested_indices.set_bit(*struct_unnest, true) + } + let unnested_indices = unnested_indices.finish(); + let non_unnested_indices: Vec = (0..input_schema.fields().len()) + .filter(|idx| !unnested_indices.value(*idx)) + .collect(); + + // Manually build projection mapping from non-unnested input columns to their positions in the output + let input_schema = input.schema(); + let projection_mapping: ProjectionMapping = non_unnested_indices + .iter() + .map(|&input_idx| { + // Find what index the input column has in the output schema + let input_field = input_schema.field(input_idx); + let output_idx = schema + .fields() + .iter() + .position(|output_field| output_field.name() == input_field.name()) + .ok_or_else(|| { + exec_datafusion_err!( + "Non-unnested column '{}' must exist in output schema", + input_field.name() + ) + })?; + + let input_col = Arc::new(Column::new(input_field.name(), input_idx)) + as Arc; + let target_col = Arc::new(Column::new(input_field.name(), output_idx)) + as Arc; + // Use From, usize)>> for ProjectionTargets + let targets = vec![(target_col, output_idx)].into(); + Ok((input_col, targets)) + }) + .collect::>()?; + + // Create the unnest's equivalence properties by copying the input plan's equivalence properties + // for the unaffected columns. Except for the constraints, which are removed entirely because + // the unnest operation invalidates any global uniqueness or primary-key constraints. + let input_eq_properties = input.equivalence_properties(); + let eq_properties = input_eq_properties + .project(&projection_mapping, Arc::clone(&schema)) + .with_constraints(Constraints::default()); + + // Output partitioning must use the projection mapping + let output_partitioning = input + .output_partitioning() + .project(&projection_mapping, &eq_properties); + + Ok(PlanProperties::new( + eq_properties, + output_partitioning, input.pipeline_behavior(), input.boundedness(), - ) + )) } /// Input execution plan @@ -173,7 +239,7 @@ impl ExecutionPlan for UnnestExec { self.struct_column_indices.clone(), Arc::clone(&self.schema), self.options.clone(), - ))) + )?)) } fn required_input_distribution(&self) -> Vec { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index d76bcc89b3db..80e7eb39d867 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1757,7 +1757,7 @@ impl protobuf::PhysicalPlanNode { unnest.struct_type_columns.iter().map(|c| *c as _).collect(), Arc::new(convert_required!(unnest.schema)?), into_required!(unnest.options)?, - ))) + )?)) } fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b93d0d3c4e7c..d19dcdbf24ce 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1714,7 +1714,7 @@ fn roundtrip_unnest() -> Result<()> { vec![2, 4], output_schema, options, - ); + )?; roundtrip_test(Arc::new(unnest)) } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 67b3a7cf5666..38fcc1ba9016 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -941,3 +941,275 @@ where min_height * width1 = ( ) ---- 4 7 4 28 + +## Unnest with ordering on unrelated column is preserved +query TT +EXPLAIN WITH unnested AS (SELECT + ROW_NUMBER() OVER () AS generated_id, + unnest(array[value]) as ar + FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id; +---- +logical_plan +01)Projection: array_agg(unnested.ar) +02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]] +03)----SubqueryAlias: unnested +04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar +05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[] +06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value)) +07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +08)--------------TableScan: range() projection=[value] +physical_plan +01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)] +02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted +03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted +07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar] +08)--------------UnnestExec +09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] +10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] +12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] + +# Unnest array where data is already ordered by column2 (100, 200, 300, 400) +statement ok +COPY ( + SELECT * FROM VALUES + ([1,2,3], 100), + ([3], 200), + ([], 300), + ([3,1], 400) + ORDER BY column2 + ) TO 'test_files/scratch/unnest/ordered_array.parquet'; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/unnest/ordered_array.parquet' +WITH ORDER (column2) + +query ?I +SELECT * FROM t; +---- +[1, 2, 3] 100 +[3] 200 +[] 300 +[3, 1] 400 + +# Data is sorted on column2 already, so no need to sort again +query II +SELECT UNNEST(column1), column2 FROM t ORDER BY column2; +---- +1 100 +2 100 +3 100 +3 200 +3 400 +1 400 + +# Explain should not have a SortExec +query TT +EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2; +---- +logical_plan +01)Sort: t.column2 ASC NULLS LAST +02)--Projection: __unnest_placeholder(t.column1,depth=1) AS UNNEST(t.column1), t.column2 +03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=1] structs[] +04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 +05)--------TableScan: t projection=[column1, column2] +physical_plan +01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=1)@0 as UNNEST(t.column1), column2@1 as column2] +02)--UnnestExec +03)----ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_array.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet + +# Explain should have a SortExec at the top because we order by the output of the unnest (i.e. discarding the ordering) +query TT +EXPLAIN SELECT UNNEST(column1) as unnested, column2 FROM t ORDER BY 1; +---- +logical_plan +01)Sort: unnested ASC NULLS LAST +02)--Projection: __unnest_placeholder(t.column1,depth=1) AS UNNEST(t.column1) AS unnested, t.column2 +03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=1] structs[] +04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 +05)--------TableScan: t projection=[column1, column2] +physical_plan +01)SortExec: expr=[unnested@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=1)@0 as unnested, column2@1 as column2] +03)----UnnestExec +04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_array.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet + +# cleanup +statement ok +drop table t; + +# Unnest tuple where the data is already sorted by column 1 +statement ok +COPY ( + SELECT * FROM VALUES + (100, [3,2,1], 'a'), + (200, [1,2,3], 'b'), + (300, [3,1,2], 'c') + ORDER BY column1 + ) TO 'test_files/scratch/unnest/ordered_tuples.parquet'; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/unnest/ordered_tuples.parquet' +WITH ORDER (column1) + +query I?T +SELECT * FROM t; +---- +100 [3, 2, 1] a +200 [1, 2, 3] b +300 [3, 1, 2] c + +# Put the columns in a tuple and unnest, we need to sort because we discard ordering of unnested columns +query TT +EXPLAIN WITH unnested AS ( + SELECT unnest((column1, column2, column3)) + FROM t +) SELECT * FROM unnested order by 1; +---- +logical_plan +01)Sort: unnested.__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0 ASC NULLS LAST +02)--SubqueryAlias: unnested +03)----Unnest: lists[] structs[__unnest_placeholder(struct(t.column1,t.column2,t.column3))] +04)------Projection: struct(t.column1, t.column2, t.column3) AS __unnest_placeholder(struct(t.column1,t.column2,t.column3)) +05)--------TableScan: t projection=[column1, column2, column3] +physical_plan +01)SortExec: expr=[__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--UnnestExec +03)----ProjectionExec: expr=[struct(column1@0, column2@1, column3@2) as __unnest_placeholder(struct(t.column1,t.column2,t.column3))] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_tuples.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet + +# cleanup +statement ok +drop table t; + +# Unnest struct where data is already ordered by column2 (100, 200, 300, 400) +statement ok +COPY ( + SELECT * FROM VALUES + (named_struct('s1', 1, 's2', 2, 's3', 3), 100), + (named_struct('s1', 1, 's2', 3, 's3', 2), 200), + (named_struct('s1', 2, 's2', 1, 's3', 3), 300), + (named_struct('s1', 3, 's2', 2, 's3', 1), 400) + ORDER BY column2 + ) TO 'test_files/scratch/unnest/ordered_struct.parquet'; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/unnest/ordered_struct.parquet' +WITH ORDER (column2) + +query ?I +SELECT * FROM t; +---- +{s1: 1, s2: 2, s3: 3} 100 +{s1: 1, s2: 3, s3: 2} 200 +{s1: 2, s2: 1, s3: 3} 300 +{s1: 3, s2: 2, s3: 1} 400 + +# data is sorted on column2 already, so no need to sort again +query IIII +SELECT UNNEST(column1), column2 FROM t ORDER BY column2; +---- +1 2 3 100 +1 3 2 200 +2 1 3 300 +3 2 1 400 + +# Explain should not have a SortExec +query TT +EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2; +---- +logical_plan +01)Sort: t.column2 ASC NULLS LAST +02)--Unnest: lists[] structs[__unnest_placeholder(t.column1)] +03)----Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 +04)------TableScan: t projection=[column1, column2] +physical_plan +01)UnnestExec +02)--ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet + +# cleanup +statement ok +drop table t; + +# Unnest nested array (unnesting twice), struct, and array, where data is already ordered by column4 (100, 200, 300, 400) +statement ok +COPY ( + SELECT * FROM VALUES + ([[1],[2],[3]], [1,2,3], named_struct('s1', 1, 's2', 2, 's3', 3), 100), + ([[1],[3],[2]], [3], named_struct('s1', 1, 's2', 3, 's3', 2), 200), + ([[2],[1],[3]], [], named_struct('s1', 2, 's2', 1, 's3', 3), 300), + ([[3],[2],[1]], [3,1], named_struct('s1', 3, 's2', 2, 's3', 1), 400) + ORDER BY column4 + ) TO 'test_files/scratch/unnest/ordered_struct_arrays.parquet'; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/unnest/ordered_struct_arrays.parquet' +WITH ORDER (column4) + +query ???I +SELECT * FROM t; +---- +[[1], [2], [3]] [1, 2, 3] {s1: 1, s2: 2, s3: 3} 100 +[[1], [3], [2]] [3] {s1: 1, s2: 3, s3: 2} 200 +[[2], [1], [3]] [] {s1: 2, s2: 1, s3: 3} 300 +[[3], [2], [1]] [3, 1] {s1: 3, s2: 2, s3: 1} 400 + +# data is sorted on column4 already, so no need to sort again +query IIIIII +SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column4 FROM t ORDER BY column4; +---- +1 1 1 2 3 100 +NULL 2 1 2 3 100 +NULL 3 1 2 3 100 +2 1 1 2 3 100 +NULL 2 1 2 3 100 +NULL 3 1 2 3 100 +3 1 1 2 3 100 +NULL 2 1 2 3 100 +NULL 3 1 2 3 100 +1 3 1 3 2 200 +3 3 1 3 2 200 +2 3 1 3 2 200 +2 NULL 2 1 3 300 +1 NULL 2 1 3 300 +3 NULL 2 1 3 300 +3 3 3 2 1 400 +NULL 1 3 2 1 400 +2 3 3 2 1 400 +NULL 1 3 2 1 400 +1 3 3 2 1 400 +NULL 1 3 2 1 400 + +# Explain should not have a SortExec +query TT +EXPLAIN SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column4 FROM t ORDER BY column4; +---- +logical_plan +01)Sort: t.column4 ASC NULLS LAST +02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3, t.column4 +03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=2, __unnest_placeholder(t.column2)|depth=1] structs[__unnest_placeholder(t.column3)] +04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 AS __unnest_placeholder(t.column2), t.column3 AS __unnest_placeholder(t.column3), t.column4 +05)--------TableScan: t projection=[column1, column2, column3, column4] +physical_plan +01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2@3 as __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3@4 as __unnest_placeholder(t.column3).s3, column4@5 as column4] +02)--UnnestExec +03)----ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as __unnest_placeholder(t.column2), column3@2 as __unnest_placeholder(t.column3), column4@3 as column4] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct_arrays.parquet]]}, projection=[column1, column2, column3, column4], output_ordering=[column4@3 ASC NULLS LAST], file_type=parquet + +# cleanup +statement ok +drop table t;