Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 70 additions & 9 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::{
};

use arrow::array::{
new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array,
LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
new_null_array, Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray,
Int64Array, LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
};
use arrow::compute::kernels::length::length;
use arrow::compute::kernels::zip::zip;
Expand All @@ -43,16 +43,19 @@ use arrow::record_batch::RecordBatch;
use arrow_ord::cmp::lt;
use async_trait::async_trait;
use datafusion_common::{
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result,
UnnestOptions,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use futures::{Stream, StreamExt};
use log::trace;

/// Unnest the given columns (either with type struct or list)
/// For list unnesting, each rows is vertically transformed into multiple rows
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
/// For list unnesting, each row is vertically transformed into multiple rows
/// For struct unnesting, each column is horizontally transformed into multiple columns,
Comment on lines -54 to +58
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar fix

/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
///
/// See [`UnnestOptions`] for more details and an example.
Expand Down Expand Up @@ -83,7 +86,12 @@ impl UnnestExec {
schema: SchemaRef,
options: UnnestOptions,
) -> Self {
let cache = Self::compute_properties(&input, Arc::clone(&schema));
let cache = Self::compute_properties(
&input,
&list_column_indices,
&struct_column_indices,
Arc::clone(&schema),
);

UnnestExec {
input,
Expand All @@ -99,11 +107,64 @@ impl UnnestExec {
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
list_column_indices: &[ListUnnest],
struct_column_indices: &[usize],
schema: SchemaRef,
) -> PlanProperties {
// Find out which indices are not unnested, such that they can be copied over from the input plan
let input_schema = input.schema();
let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
unnested_indices.append_n(input_schema.fields().len(), false);
for list_unnest in list_column_indices {
unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
}
for list_unnest in struct_column_indices {
unnested_indices.set_bit(*list_unnest, true)
}
let unnested_indices = unnested_indices.finish();
let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
.filter(|idx| !unnested_indices.value(*idx))
.collect();

// Manually build projection mapping from non-unnested input columns to their positions in the output
let input_schema = input.schema();
let projection_mapping: ProjectionMapping = non_unnested_indices
.iter()
.map(|&input_idx| {
// Find what index the input column has in the output schema
let input_field = input_schema.field(input_idx);
let output_idx = schema
.fields()
.iter()
.position(|output_field| output_field.name() == input_field.name())
.expect("Non-unnested column must exist in output schema");

let input_col = Arc::new(Column::new(input_field.name(), input_idx))
as Arc<dyn PhysicalExpr>;
let target_col = Arc::new(Column::new(input_field.name(), output_idx))
as Arc<dyn PhysicalExpr>;
// Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
let targets = vec![(target_col, output_idx)].into();
(input_col, targets)
})
.collect(); // Use FromIterator to collect the projection mapping

// Create the unnest's equivalence properties by copying the input plan's equivalence properties
// for the unaffected columns. Except for the constraints, which are removed entirely because
// the unnest operation invalidates any global uniqueness or primary-key constraints.
let input_eq_properties = input.equivalence_properties();
let eq_properties = input_eq_properties
.project(&projection_mapping, Arc::clone(&schema))
.with_constraints(Constraints::default());

// Output partitioning must use the projection mapping
let output_partitioning = input
.output_partitioning()
.project(&projection_mapping, &eq_properties);

PlanProperties::new(
EquivalenceProperties::new(schema),
input.output_partitioning().to_owned(),
eq_properties,
output_partitioning,
input.pipeline_behavior(),
input.boundedness(),
)
Expand Down
239 changes: 239 additions & 0 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -941,3 +941,242 @@ where min_height * width1 = (
)
----
4 7 4 28

## Unnest with ordering on unrelated column is preserved
query TT
EXPLAIN WITH unnested AS (SELECT
ROW_NUMBER() OVER () AS generated_id,
unnest(array[value]) as ar
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
----
logical_plan
01)Projection: array_agg(unnested.ar)
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
03)----SubqueryAlias: unnested
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
08)--------------TableScan: range() projection=[value]
physical_plan
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plan shows the data being sorted, but the comment suggests it should not be 🤔

Could you please explain in more detail what you expect this explain plan to be showing? Given there is no ORDER BY in the query (or in the OVER clause) it is not clear why this is testing ordering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this test is from your comment here #15231 (comment)

By "the comment" here do you mean "Unnest with ordering on unrelated column is preserved"?

The point of the test is to show that we don't lose the inherent ordering of generated_id (through row_number()) when we do unnest, so if we look at 11) BoundedWindowAggExec, we see mode=[Sorted] and then we also see ordering_mode=Sorted at 06) AggregateExec, and at 02) AggregateExec. I will include this in the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, sorry, I understand your comment now. You're saying it is being sorted at 03 here, and the comment is saying it shouldn't. I agree. Good catch. How did I make this regression happen? For sure it was not doing this sorting in a previous iteration of this PR...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You actually pointed this test case out here as well and I didn't fix it, I just added your new test case (which does pass, thankfully) #16985 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to understand. The change in this PR indeed adds SortExec step in 3, making the physical plan go from

01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
07)------------UnnestExec
08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

to

physical_plan
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a regression, doesn't it? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I spent some more time looking and I think your code is working as expected

Namely, Note the ordering_mode=Sorted that is above the UnnestExec - that means that the GROUP BY columns (in this case generated_id) are sorted, as expected.

06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]

The reason there is a sort in the new plan is that the optimizer has decided to repartition the intermediate aggregate result (unrelated to this PR)

03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true] 
04)------CoalesceBatchesExec: target_batch_size=8192 
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4 
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. That's a relief. Thank you.

04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

## Unnest with ordering on unrelated column is preserved
query TT
EXPLAIN WITH unnested AS (SELECT
ROW_NUMBER() OVER () AS generated_id,
unnest(array[value]) as ar
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
----
logical_plan
01)Projection: array_agg(unnested.ar)
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
03)----SubqueryAlias: unnested
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
08)--------------TableScan: range() projection=[value]
physical_plan
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

# Unnest array where data is already ordered by column2 (100, 200, 300, 400)
statement ok
COPY (
SELECT * FROM VALUES
([1,2,3], 100),
([3], 200),
([], 300),
([3,1], 400)
ORDER BY column2
) TO 'test_files/scratch/unnest/ordered_array.parquet';

statement ok
CREATE EXTERNAL TABLE t
STORED AS PARQUET
LOCATION 'test_files/scratch/unnest/ordered_array.parquet'
WITH ORDER (column2)

query ?I
SELECT * FROM t;
----
[1, 2, 3] 100
[3] 200
[] 300
[3, 1] 400

# Data is sorted on column2 already, so no need to sort again
query II
SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
----
1 100
2 100
3 100
3 200
3 400
1 400

# Explain should not have a SortExec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also please add two additional tests:

  1. a negative test case here. order by the output of the unnest and verify that it is in fact sorted correctly
  2. A case with the ordering column as the first index (e.g. tuples like (100, [3,2,1], 'a') and then order by 100

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1: Included in 2b9bece
2: I'm not quite sure I understand what you mean to test here. In de7a558 I added a table with three columns of the form 100, [3,2,1], 'a' and then do a query where we select the tuple of these columns and unnest it. But in this case we discard it, since we (with the implementation in this PR) totally discard ordering when we unnest. I have a feeling this isn't what you meant - let me know

query TT
EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
----
logical_plan
01)Sort: t.column2 ASC NULLS LAST
02)--Projection: __unnest_placeholder(t.column1,depth=1) AS UNNEST(t.column1), t.column2
03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=1] structs[]
04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2
05)--------TableScan: t projection=[column1, column2]
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=1)@0 as UNNEST(t.column1), column2@1 as column2]
02)--UnnestExec
03)----ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2]
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_array.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet

# cleanup
statement ok
drop table t;

# Unnest struct where data is already ordered by column2 (100, 200, 300, 400)
statement ok
COPY (
SELECT * FROM VALUES
(named_struct('s1', 1, 's2', 2, 's3', 3), 100),
(named_struct('s1', 1, 's2', 3, 's3', 2), 200),
(named_struct('s1', 2, 's2', 1, 's3', 3), 300),
(named_struct('s1', 3, 's2', 2, 's3', 1), 400)
ORDER BY column2
) TO 'test_files/scratch/unnest/ordered_struct.parquet';

statement ok
CREATE EXTERNAL TABLE t
STORED AS PARQUET
LOCATION 'test_files/scratch/unnest/ordered_struct.parquet'
WITH ORDER (column2)

query ?I
SELECT * FROM t;
----
{s1: 1, s2: 2, s3: 3} 100
{s1: 1, s2: 3, s3: 2} 200
{s1: 2, s2: 1, s3: 3} 300
{s1: 3, s2: 2, s3: 1} 400

# data is sorted on column2 already, so no need to sort again
query IIII
SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
----
1 2 3 100
1 3 2 200
2 1 3 300
3 2 1 400

# Explain should not have a SortExec
query TT
EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
----
logical_plan
01)Sort: t.column2 ASC NULLS LAST
02)--Unnest: lists[] structs[__unnest_placeholder(t.column1)]
03)----Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2
04)------TableScan: t projection=[column1, column2]
physical_plan
01)UnnestExec
02)--ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet

# cleanup
statement ok
drop table t;

# Unnest nested array (unnesting twice), struct, and array, where data is already ordered by column4 (100, 200, 300, 400)
statement ok
COPY (
SELECT * FROM VALUES
([[1],[2],[3]], [1,2,3], named_struct('s1', 1, 's2', 2, 's3', 3), 100),
([[1],[3],[2]], [3], named_struct('s1', 1, 's2', 3, 's3', 2), 200),
([[2],[1],[3]], [], named_struct('s1', 2, 's2', 1, 's3', 3), 300),
([[3],[2],[1]], [3,1], named_struct('s1', 3, 's2', 2, 's3', 1), 400)
ORDER BY column4
) TO 'test_files/scratch/unnest/ordered_struct_arrays.parquet';

statement ok
CREATE EXTERNAL TABLE t
STORED AS PARQUET
LOCATION 'test_files/scratch/unnest/ordered_struct_arrays.parquet'
WITH ORDER (column4)

query ???I
SELECT * FROM t;
----
[[1], [2], [3]] [1, 2, 3] {s1: 1, s2: 2, s3: 3} 100
[[1], [3], [2]] [3] {s1: 1, s2: 3, s3: 2} 200
[[2], [1], [3]] [] {s1: 2, s2: 1, s3: 3} 300
[[3], [2], [1]] [3, 1] {s1: 3, s2: 2, s3: 1} 400

# data is sorted on column4 already, so no need to sort again
query IIIIII
SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column4 FROM t ORDER BY column4;
----
1 1 1 2 3 100
NULL 2 1 2 3 100
NULL 3 1 2 3 100
2 1 1 2 3 100
NULL 2 1 2 3 100
NULL 3 1 2 3 100
3 1 1 2 3 100
NULL 2 1 2 3 100
NULL 3 1 2 3 100
1 3 1 3 2 200
3 3 1 3 2 200
2 3 1 3 2 200
2 NULL 2 1 3 300
1 NULL 2 1 3 300
3 NULL 2 1 3 300
3 3 3 2 1 400
NULL 1 3 2 1 400
2 3 3 2 1 400
NULL 1 3 2 1 400
1 3 3 2 1 400
NULL 1 3 2 1 400

# Explain should not have a SortExec
query TT
EXPLAIN SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column4 FROM t ORDER BY column4;
----
logical_plan
01)Sort: t.column4 ASC NULLS LAST
02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3, t.column4
03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=2, __unnest_placeholder(t.column2)|depth=1] structs[__unnest_placeholder(t.column3)]
04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 AS __unnest_placeholder(t.column2), t.column3 AS __unnest_placeholder(t.column3), t.column4
05)--------TableScan: t projection=[column1, column2, column3, column4]
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2@3 as __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3@4 as __unnest_placeholder(t.column3).s3, column4@5 as column4]
02)--UnnestExec
03)----ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as __unnest_placeholder(t.column2), column3@2 as __unnest_placeholder(t.column3), column4@3 as column4]
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct_arrays.parquet]]}, projection=[column1, column2, column3, column4], output_ordering=[column4@3 ASC NULLS LAST], file_type=parquet

# cleanup
statement ok
drop table t;