diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index db011c4be43a..5b7d9ac8fbe9 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -66,8 +66,8 @@ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ - displayable, get_plan_string, DisplayAs, DisplayFormatType, ExecutionPlanProperties, - PlanProperties, Statistics, + displayable, DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, + Statistics, }; use insta::Settings; @@ -469,83 +469,6 @@ impl TestConfig { self } - // This be deleted in https://github.com/apache/datafusion/pull/18185 - /// Perform a series of runs using the current [`TestConfig`], - /// assert the expected plan result, - /// and return the result plan (for potential subsequent runs). - fn run( - &self, - expected_lines: &[&str], - plan: Arc, - optimizers_to_run: &[Run], - ) -> Result> { - let expected_lines: Vec<&str> = expected_lines.to_vec(); - - // Add the ancillary output requirements operator at the start: - let optimizer = OutputRequirements::new_add_mode(); - let mut optimized = optimizer.optimize(plan.clone(), &self.config)?; - - // This file has 2 rules that use tree node, apply these rules to original plan consecutively - // After these operations tree nodes should be in a consistent state. - // This code block makes sure that these rules doesn't violate tree node integrity. - { - let adjusted = if self.config.optimizer.top_down_join_key_reordering { - // Run adjust_input_keys_ordering rule - let plan_requirements = - PlanWithKeyRequirements::new_default(plan.clone()); - let adjusted = plan_requirements - .transform_down(adjust_input_keys_ordering) - .data() - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - adjusted.plan - } else { - // Run reorder_join_keys_to_inputs rule - plan.clone() - .transform_up(|plan| { - Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) - }) - .data()? - }; - - // Then run ensure_distribution rule - DistributionContext::new_default(adjusted) - .transform_up(|distribution_context| { - ensure_distribution(distribution_context, &self.config) - }) - .data() - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - } - - for run in optimizers_to_run { - optimized = match run { - Run::Distribution => { - let optimizer = EnforceDistribution::new(); - optimizer.optimize(optimized, &self.config)? - } - Run::Sorting => { - let optimizer = EnforceSorting::new(); - optimizer.optimize(optimized, &self.config)? - } - }; - } - - // Remove the ancillary output requirements operator when done: - let optimizer = OutputRequirements::new_remove_mode(); - let optimized = optimizer.optimize(optimized, &self.config)?; - - // Now format correctly - let actual_lines = get_plan_string(&optimized); - - assert_eq!( - &expected_lines, &actual_lines, - "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n" - ); - - Ok(optimized) - } - /// Perform a series of runs using the current [`TestConfig`], /// assert the expected plan result, /// and return the result plan (for potential subsequent runs). @@ -1503,15 +1426,6 @@ fn multi_smj_joins() -> Result<()> { for join_type in join_types { let join = sort_merge_join_exec(left.clone(), right.clone(), &join_on, &join_type); - let join_plan = |shift| -> String { - format!( - "{}SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]", - " ".repeat(shift) - ) - }; - let join_plan_indent2 = join_plan(2); - let join_plan_indent6 = join_plan(6); - let join_plan_indent10 = join_plan(10); // Top join on (a == c) let top_join_on = vec![( @@ -1520,235 +1434,246 @@ fn multi_smj_joins() -> Result<()> { )]; let top_join = sort_merge_join_exec(join.clone(), parquet_exec(), &top_join_on, &join_type); - let top_join_plan = - format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]"); - - let expected = match join_type { - // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs - JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => - vec![ - top_join_plan.as_str(), - &join_plan_indent2, - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs - // Since ordering of the left child is not preserved after SortMergeJoin - // when mode is Right, RightSemi, RightAnti, Full - // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases - // when mode is Inner, Left, LeftSemi, LeftAnti - // Similarly, since partitioning of the left side is not preserved - // when mode is Right, RightSemi, RightAnti, Full - // - We need to add one additional Hash Repartition after SortMergeJoin in contrast the test - // cases when mode is Inner, Left, LeftSemi, LeftAnti - _ => vec![ - top_join_plan.as_str(), - // Below 2 operators are differences introduced, when join mode is changed - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - &join_plan_indent6, - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - }; - // TODO(wiedld): show different test result if enforce sorting first. - test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; - - let expected_first_sort_enforcement = match join_type { - // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs - JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => - vec![ - top_join_plan.as_str(), - &join_plan_indent2, - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs - // Since ordering of the left child is not preserved after SortMergeJoin - // when mode is Right, RightSemi, RightAnti, Full - // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases - // when mode is Inner, Left, LeftSemi, LeftAnti - // Similarly, since partitioning of the left side is not preserved - // when mode is Right, RightSemi, RightAnti, Full - // - We need to add one additional Hash Repartition and Roundrobin repartition after - // SortMergeJoin in contrast the test cases when mode is Inner, Left, LeftSemi, LeftAnti - _ => vec![ - top_join_plan.as_str(), - // Below 4 operators are differences introduced, when join mode is changed - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - &join_plan_indent10, - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - }; - // TODO(wiedld): show different test result if enforce distribution first. - test_config.run( - &expected_first_sort_enforcement, - top_join, - &SORT_DISTRIB_DISTRIB, - )?; - match join_type { - JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - // This time we use (b1 == c) for top join - // Join on (b1 == c) - let top_join_on = vec![( - Arc::new(Column::new_with_schema("b1", &join.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _, - )]; - let top_join = - sort_merge_join_exec(join, parquet_exec(), &top_join_on, &join_type); - let top_join_plan = - format!("SortMergeJoin: join_type={join_type}, on=[(b1@6, c@2)]"); - - let expected = match join_type { - // Should include 6 RepartitionExecs(3 hash, 3 round-robin) and 3 SortExecs - JoinType::Inner | JoinType::Right => vec![ - top_join_plan.as_str(), - &join_plan_indent2, - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs - JoinType::Left | JoinType::Full => vec![ - top_join_plan.as_str(), - " SortExec: expr=[b1@6 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", - &join_plan_indent6, - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - // this match arm cannot be reached - _ => unreachable!() - }; - // TODO(wiedld): show different test result if enforce sorting first. - test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; - - let expected_first_sort_enforcement = match join_type { - // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs - JoinType::Inner | JoinType::Right => vec![ - top_join_plan.as_str(), - &join_plan_indent2, - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs - JoinType::Left | JoinType::Full => vec![ - top_join_plan.as_str(), - " RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[b1@6 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - &join_plan_indent10, - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - // this match arm cannot be reached - _ => unreachable!() - }; + let mut settings = Settings::clone_current(); + settings.add_filter(&format!("join_type={join_type}"), "join_type=..."); - // TODO(wiedld): show different test result if enforce distribution first. - test_config.run( - &expected_first_sort_enforcement, - top_join, - &SORT_DISTRIB_DISTRIB, - )?; - } - _ => {} + #[rustfmt::skip] + insta::allow_duplicates! { + settings.bind(|| { + let plan_distrib = test_config.to_plan(top_join.clone(), &DISTRIB_DISTRIB_SORT); + + match join_type { + // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs + JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { + assert_plan!(plan_distrib, @r" +SortMergeJoin: join_type=..., on=[(a@0, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[b1@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[c@2 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs + // Since ordering of the left child is not preserved after SortMergeJoin + // when mode is Right, RightSemi, RightAnti, Full + // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases + // when mode is Inner, Left, LeftSemi, LeftAnti + // Similarly, since partitioning of the left side is not preserved + // when mode is Right, RightSemi, RightAnti, Full + // - We need to add one additional Hash Repartition after SortMergeJoin in contrast the test + // cases when mode is Inner, Left, LeftSemi, LeftAnti + _ => { + assert_plan!(plan_distrib, @r" +SortMergeJoin: join_type=..., on=[(a@0, c@2)] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10 + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[b1@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[c@2 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + } + + let plan_sort = test_config.to_plan(top_join.clone(), &SORT_DISTRIB_DISTRIB); + + match join_type { + // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs + JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { + // TODO(wiedld): show different test result if enforce distribution first. + assert_plan!(plan_sort, @r" +SortMergeJoin: join_type=..., on=[(a@0, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs + // Since ordering of the left child is not preserved after SortMergeJoin + // when mode is Right, RightSemi, RightAnti, Full + // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases + // when mode is Inner, Left, LeftSemi, LeftAnti + // Similarly, since partitioning of the left side is not preserved + // when mode is Right, RightSemi, RightAnti, Full + // - We need to add one additional Hash Repartition and Roundrobin repartition after + // SortMergeJoin in contrast the test cases when mode is Inner, Left, LeftSemi, LeftAnti + _ => { + // TODO(wiedld): show different test result if enforce distribution first. + assert_plan!(plan_sort, @r" +SortMergeJoin: join_type=..., on=[(a@0, c@2)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + } + + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { + // This time we use (b1 == c) for top join + // Join on (b1 == c) + let top_join_on = vec![( + Arc::new(Column::new_with_schema("b1", &join.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _, + )]; + let top_join = sort_merge_join_exec(join, parquet_exec(), &top_join_on, &join_type); + + let plan_distrib = test_config.to_plan(top_join.clone(), &DISTRIB_DISTRIB_SORT); + + match join_type { + // Should include 6 RepartitionExecs(3 hash, 3 round-robin) and 3 SortExecs + JoinType::Inner | JoinType::Right => { + // TODO(wiedld): show different test result if enforce sorting first. + assert_plan!(plan_distrib, @r" +SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[b1@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[c@2 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs + JoinType::Left | JoinType::Full => { + // TODO(wiedld): show different test result if enforce sorting first. + assert_plan!(plan_distrib, @r" +SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + SortExec: expr=[b1@6 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10 + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[b1@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + SortExec: expr=[c@2 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + // this match arm cannot be reached + _ => unreachable!() + } + + let plan_sort = test_config.to_plan(top_join, &SORT_DISTRIB_DISTRIB); + + match join_type { + // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs + JoinType::Inner | JoinType::Right => { + // TODO(wiedld): show different test result if enforce distribution first. + assert_plan!(plan_sort, @r" +SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs + JoinType::Left | JoinType::Full => { + // TODO(wiedld): show different test result if enforce distribution first. + assert_plan!(plan_sort, @r" +SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[b1@6 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet +"); + } + // this match arm cannot be reached + _ => unreachable!() + } + } + _ => {} + } + }); } } - Ok(()) } @@ -2667,46 +2592,51 @@ fn parallelization_compressed_csv() -> Result<()> { FileCompressionType::UNCOMPRESSED, ]; - let expected_not_partitioned = [ - "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - - let expected_partitioned = [ - "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; + #[rustfmt::skip] + insta::allow_duplicates! { + for compression_type in compression_types { + let plan = aggregate_exec_with_alias( + DataSourceExec::from_data_source( + FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + Arc::new(CsvSource::new(false, b',', b'"')), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_file_compression_type(compression_type) + .build(), + ), + vec![("a".to_string(), "a".to_string())], + ); + let test_config = TestConfig::default() + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + + let plan_distrib = test_config.to_plan(plan.clone(), &DISTRIB_DISTRIB_SORT); + if compression_type.is_compressed() { + // Compressed files cannot be partitioned + assert_plan!(plan_distrib, + @r" +AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] + RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 + AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false +"); + } else { + // Uncompressed files can be partitioned + assert_plan!(plan_distrib, + @r" +AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] + RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 + AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] + DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false +"); + } - for compression_type in compression_types { - let expected = if compression_type.is_compressed() { - &expected_not_partitioned[..] - } else { - &expected_partitioned[..] - }; - - let plan = aggregate_exec_with_alias( - DataSourceExec::from_data_source( - FileScanConfigBuilder::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema(), - Arc::new(CsvSource::new(false, b',', b'"')), - ) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_file_compression_type(compression_type) - .build(), - ), - vec![("a".to_string(), "a".to_string())], - ); - let test_config = TestConfig::default() - .with_query_execution_partitions(2) - .with_prefer_repartition_file_scans(10); - test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; + let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB); + assert_plan!(plan_distrib, plan_sort); + } } Ok(()) }