diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index f22a896c1894..ac47dc803955 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -394,29 +394,32 @@ fn test_suite_default_config_options() -> ConfigOptions { config } -/// How the optimizers are run. #[derive(PartialEq, Clone)] -enum DoFirst { - /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting) +enum Run { Distribution, - /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution) Sorting, } +/// Standard sets of the series of optimizer runs: +const DISTRIB_DISTRIB_SORT: [Run; 3] = + [Run::Distribution, Run::Distribution, Run::Sorting]; +const SORT_DISTRIB_DISTRIB: [Run; 3] = + [Run::Sorting, Run::Distribution, Run::Distribution]; + #[derive(Clone)] struct TestConfig { config: ConfigOptions, - optimizers_to_run: DoFirst, } -impl TestConfig { - fn new(optimizers_to_run: DoFirst) -> Self { +impl Default for TestConfig { + fn default() -> Self { Self { config: test_suite_default_config_options(), - optimizers_to_run, } } +} +impl TestConfig { /// If preferred, will not repartition / resort data if it is already sorted. fn with_prefer_existing_sort(mut self) -> Self { self.config.optimizer.prefer_existing_sort = true; @@ -442,40 +445,30 @@ impl TestConfig { self.config.execution.target_partitions = target_partitions; self } -} -/// Runs the repartition optimizer and asserts the plan against the expected -/// Arguments -/// * `EXPECTED_LINES` - Expected output plan -/// * `PLAN` - Input plan -/// * `CONFIG` - [`TestConfig`] -macro_rules! assert_optimized { - ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => { - let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); - - let TestConfig { - config, - optimizers_to_run, - } = $CONFIG; - - // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade - // because they were written prior to the separation of `BasicEnforcement` into - // `EnforceSorting` and `EnforceDistribution`. - // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create - // new tests for the cascade. + /// Perform a series of runs using the current [`TestConfig`], + /// assert the expected plan result, + /// and return the result plan (for potentional subsequent runs). + fn run( + &self, + expected_lines: &[&str], + plan: Arc, + optimizers_to_run: &[Run], + ) -> Result> { + let expected_lines: Vec<&str> = expected_lines.to_vec(); // Add the ancillary output requirements operator at the start: let optimizer = OutputRequirements::new_add_mode(); - let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let mut optimized = optimizer.optimize(plan.clone(), &self.config)?; // This file has 2 rules that use tree node, apply these rules to original plan consecutively // After these operations tree nodes should be in a consistent state. // This code block makes sure that these rules doesn't violate tree node integrity. { - let adjusted = if config.optimizer.top_down_join_key_reordering { + let adjusted = if self.config.optimizer.top_down_join_key_reordering { // Run adjust_input_keys_ordering rule let plan_requirements = - PlanWithKeyRequirements::new_default($PLAN.clone()); + PlanWithKeyRequirements::new_default(plan.clone()); let adjusted = plan_requirements .transform_down(adjust_input_keys_ordering) .data() @@ -484,51 +477,39 @@ macro_rules! assert_optimized { adjusted.plan } else { // Run reorder_join_keys_to_inputs rule - $PLAN.clone().transform_up(|plan| { - Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) - }) - .data()? + plan.clone() + .transform_up(|plan| { + Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) + }) + .data()? }; // Then run ensure_distribution rule DistributionContext::new_default(adjusted) .transform_up(|distribution_context| { - ensure_distribution(distribution_context, &config) + ensure_distribution(distribution_context, &self.config) }) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. } - let optimized = if *optimizers_to_run == DoFirst::Distribution { - // Run enforce distribution rule first: - let optimizer = EnforceDistribution::new(); - let optimized = optimizer.optimize(optimized, &config)?; - // The rule should be idempotent. - // Re-running this rule shouldn't introduce unnecessary operators. - let optimizer = EnforceDistribution::new(); - let optimized = optimizer.optimize(optimized, &config)?; - // Run the enforce sorting rule: - let optimizer = EnforceSorting::new(); - let optimized = optimizer.optimize(optimized, &config)?; - optimized - } else { - // Run the enforce sorting rule first: - let optimizer = EnforceSorting::new(); - let optimized = optimizer.optimize(optimized, &config)?; - // Run enforce distribution rule: - let optimizer = EnforceDistribution::new(); - let optimized = optimizer.optimize(optimized, &config)?; - // The rule should be idempotent. - // Re-running this rule shouldn't introduce unnecessary operators. - let optimizer = EnforceDistribution::new(); - let optimized = optimizer.optimize(optimized, &config)?; - optimized - }; + for run in optimizers_to_run { + optimized = match run { + Run::Distribution => { + let optimizer = EnforceDistribution::new(); + optimizer.optimize(optimized, &self.config)? + } + Run::Sorting => { + let optimizer = EnforceSorting::new(); + optimizer.optimize(optimized, &self.config)? + } + }; + } // Remove the ancillary output requirements operator when done: let optimizer = OutputRequirements::new_remove_mode(); - let optimized = optimizer.optimize(optimized, &config)?; + let optimized = optimizer.optimize(optimized, &self.config)?; // Now format correctly let actual_lines = get_plan_string(&optimized); @@ -538,7 +519,9 @@ macro_rules! assert_optimized { "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", expected_lines, actual_lines ); - }; + + Ok(optimized) + } } macro_rules! assert_plan_txt { @@ -647,12 +630,10 @@ fn multi_hash_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?; } JoinType::RightSemi | JoinType::RightAnti => {} } @@ -715,12 +696,10 @@ fn multi_hash_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?; } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {} } @@ -776,12 +755,9 @@ fn multi_joins_after_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?; // Join on (a2 == c) let top_join_on = vec![( @@ -806,12 +782,9 @@ fn multi_joins_after_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -864,12 +837,9 @@ fn multi_joins_after_multi_alias() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -908,12 +878,9 @@ fn join_after_agg_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -965,12 +932,9 @@ fn hash_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1095,16 +1059,9 @@ fn multi_hash_join_key_ordering() -> Result<()> { " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - filter_top_join.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected, - filter_top_join, - &TestConfig::new(DoFirst::Sorting) - ); + let test_config = TestConfig::default(); + test_config.run(expected, filter_top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, filter_top_join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1385,6 +1342,8 @@ fn reorder_join_keys_to_right_input() -> Result<()> { /// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn multi_smj_joins() -> Result<()> { + let test_config = TestConfig::default().with_prefer_existing_sort(); + let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ ("a".to_string(), "a1".to_string()), @@ -1484,11 +1443,8 @@ fn multi_smj_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() - ); + // TODO(wiedld): show different test result if enforce sorting first. + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs @@ -1542,11 +1498,12 @@ fn multi_smj_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected_first_sort_enforcement, + // TODO(wiedld): show different test result if enforce distribution first. + test_config.run( + &expected_first_sort_enforcement, top_join, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() - ); + &SORT_DISTRIB_DISTRIB, + )?; match join_type { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { @@ -1603,11 +1560,8 @@ fn multi_smj_joins() -> Result<()> { // this match arm cannot be reached _ => unreachable!() }; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() - ); + // TODO(wiedld): show different test result if enforce sorting first. + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs @@ -1654,11 +1608,12 @@ fn multi_smj_joins() -> Result<()> { _ => unreachable!() }; - assert_optimized!( - expected_first_sort_enforcement, + // TODO(wiedld): show different test result if enforce distribution first. + test_config.run( + &expected_first_sort_enforcement, top_join, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() - ); + &SORT_DISTRIB_DISTRIB, + )?; } _ => {} } @@ -1714,6 +1669,10 @@ fn smj_join_key_ordering() -> Result<()> { ]; let join = sort_merge_join_exec(left, right.clone(), &join_on, &JoinType::Inner); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + + // Test: run EnforceDistribution, then EnforceSort. // Only two RepartitionExecs added let expected = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", @@ -1733,12 +1692,9 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - join.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() - ); + test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?; + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", " RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC", @@ -1763,11 +1719,7 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected_first_sort_enforcement, - join, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() - ); + test_config.run(expected_first_sort_enforcement, join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1791,6 +1743,8 @@ fn merge_does_not_need_sort() -> Result<()> { let exec: Arc = Arc::new(SortPreservingMergeExec::new(sort_key, exec)); + // Test: run EnforceDistribution, then EnforceSort. + // // The optimizer should not add an additional SortExec as the // data is already sorted let expected = &[ @@ -1798,19 +1752,22 @@ fn merge_does_not_need_sort() -> Result<()> { " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution)); + let test_config = TestConfig::default(); + test_config.run(expected, exec.clone(), &DISTRIB_DISTRIB_SORT)?; + // Test: result IS DIFFERENT, if EnforceSorting is run first: + // // In this case preserving ordering through order preserving operators is not desirable // (according to flag: PREFER_EXISTING_SORT) // hence in this case ordering lost during CoalescePartitionsExec and re-introduced with // SortExec at the top. - let expected = &[ + let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting)); + test_config.run(expected_first_sort_enforcement, exec, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1851,12 +1808,10 @@ fn union_to_interleave() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan.clone(), &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1899,16 +1854,11 @@ fn union_not_to_interleave() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union() - ); - assert_optimized!( - expected, - plan, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union() - ); + // TestConfig: Prefer existing union. + let test_config = TestConfig::default().with_prefer_existing_union(); + + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1925,12 +1875,10 @@ fn added_repartition_to_single_partition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(&expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1948,12 +1896,10 @@ fn repartition_deepest_node() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1972,12 +1918,9 @@ fn repartition_unsorted_limit() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1998,12 +1941,10 @@ fn repartition_sorted_limit() -> Result<()> { " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2030,12 +1971,9 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2064,12 +2002,10 @@ fn repartition_ignores_limit() -> Result<()> { // Expect no repartition to happen for local limit " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2088,12 +2024,9 @@ fn repartition_ignores_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2113,12 +2046,10 @@ fn repartition_through_sort_preserving_merge() -> Result<()> { "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2136,25 +2067,24 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { parquet_exec_multiple_sorted(vec![sort_key]), ); + // Test: run EnforceDistribution, then EnforceSort + // // should not sort (as the data was already sorted) // should not repartition, since increased parallelism is not beneficial for SortPReservingMerge let expected = &[ "SortPreservingMergeExec: [c@2 ASC]", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - - let expected = &[ + // Test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2170,6 +2100,8 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { let input = union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]); let plan = sort_preserving_merge_exec(sort_key, input); + // Test: run EnforceDistribution, then EnforceSort. + // // should not repartition / sort (as the data was already sorted) let expected = &[ "SortPreservingMergeExec: [c@2 ASC]", @@ -2177,21 +2109,18 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - - let expected = &[ + // test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2211,6 +2140,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> { sort_key, ); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + // during repartitioning ordering is preserved let expected = &[ "SortRequiredExec: [d@3 ASC]", @@ -2219,16 +2151,8 @@ fn repartition_does_not_destroy_sort() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() - ); - assert_optimized!( - expected, - plan, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() - ); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2268,12 +2192,10 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2297,6 +2219,7 @@ fn repartition_transitively_with_projection() -> Result<()> { }]); let plan = sort_preserving_merge_exec(sort_key, proj); + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [sum@0 ASC]", " SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]", @@ -2305,13 +2228,10 @@ fn repartition_transitively_with_projection() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", @@ -2320,11 +2240,7 @@ fn repartition_transitively_with_projection() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected_first_sort_enforcement, - plan, - &TestConfig::new(DoFirst::Sorting) - ); + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2356,12 +2272,10 @@ fn repartition_ignores_transitively_with_projection() -> Result<()> { " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2393,12 +2307,10 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2412,6 +2324,7 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { }]); let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false); + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -2420,13 +2333,10 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", @@ -2435,11 +2345,7 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected_first_sort_enforcement, - plan, - &TestConfig::new(DoFirst::Sorting) - ); + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2465,6 +2371,7 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> false, ); + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) @@ -2475,13 +2382,10 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", @@ -2490,11 +2394,7 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected_first_sort_enforcement, - plan, - &TestConfig::new(DoFirst::Sorting) - ); + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2505,24 +2405,33 @@ fn parallelization_single_partition() -> Result<()> { let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone()); let plan_csv = aggregate_exec_with_alias(csv_exec(), alias); + let test_config = TestConfig::default() + .with_prefer_repartition_file_scans(10) + .with_query_execution_partitions(2); + + // Test: with parquet let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + &expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - - let test_config = TestConfig::new(DoFirst::Distribution) - .with_prefer_repartition_file_scans(10) - .with_query_execution_partitions(2); - assert_optimized!(expected_parquet, plan_parquet, &test_config); - assert_optimized!(expected_csv, plan_csv, &test_config); + test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2538,34 +2447,47 @@ fn parallelization_multiple_files() -> Result<()> { let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()])); let plan = sort_required_exec_with_req(plan, sort_key); - let test_config = TestConfig::new(DoFirst::Distribution) + let test_config = TestConfig::default() .with_prefer_existing_sort() .with_prefer_repartition_file_scans(1); // The groups must have only contiguous ranges of rows from the same file // if any group has rows from multiple files, the data is no longer sorted destroyed // https://github.com/apache/datafusion/issues/8451 - let expected = [ + let expected_with_3_target_partitions = [ "SortRequiredExec: [a@0 ASC]", " FilterExec: c@2 = 0", " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - plan, - &test_config.clone().with_query_execution_partitions(3) - ); + let test_config_concurrency_3 = + test_config.clone().with_query_execution_partitions(3); + test_config_concurrency_3.run( + &expected_with_3_target_partitions, + plan.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config_concurrency_3.run( + &expected_with_3_target_partitions, + plan.clone(), + &SORT_DISTRIB_DISTRIB, + )?; - let expected = [ + let expected_with_8_target_partitions = [ "SortRequiredExec: [a@0 ASC]", " FilterExec: c@2 = 0", " DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + let test_config_concurrency_8 = test_config.with_query_execution_partitions(8); + test_config_concurrency_8.run( + &expected_with_8_target_partitions, + plan.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config_concurrency_8.run( + &expected_with_8_target_partitions, plan, - &test_config.with_query_execution_partitions(8) - ); + &SORT_DISTRIB_DISTRIB, + )?; Ok(()) } @@ -2615,13 +2537,11 @@ fn parallelization_compressed_csv() -> Result<()> { .build(), vec![("a".to_string(), "a".to_string())], ); - assert_optimized!( - expected, - plan, - &TestConfig::new(DoFirst::Distribution) - .with_query_execution_partitions(2) - .with_prefer_repartition_file_scans(10) - ); + let test_config = TestConfig::default() + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; } Ok(()) } @@ -2632,6 +2552,11 @@ fn parallelization_two_partitions() -> Result<()> { let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone()); let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias); + let test_config = TestConfig::default() + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + + // Test: with parquet let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", @@ -2639,6 +2564,14 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + &expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", @@ -2646,11 +2579,9 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - let test_config = TestConfig::new(DoFirst::Distribution) - .with_query_execution_partitions(2) - .with_prefer_repartition_file_scans(10); - assert_optimized!(expected_parquet, plan_parquet, &test_config); - assert_optimized!(expected_csv, plan_csv, &test_config); + test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; + Ok(()) } @@ -2660,6 +2591,11 @@ fn parallelization_two_partitions_into_four() -> Result<()> { let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone()); let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias); + let test_config = TestConfig::default() + .with_query_execution_partitions(4) + .with_prefer_repartition_file_scans(10); + + // Test: with parquet let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", @@ -2667,6 +2603,14 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + &expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", @@ -2674,11 +2618,8 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - let test_config = TestConfig::new(DoFirst::Distribution) - .with_query_execution_partitions(4) - .with_prefer_repartition_file_scans(10); - assert_optimized!(expected_parquet, plan_parquet, &test_config); - assert_optimized!(expected_csv, plan_csv, &test_config); + test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2693,6 +2634,9 @@ fn parallelization_sorted_limit() -> Result<()> { let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(), false)); let plan_csv = limit_exec(sort_exec(sort_key, csv_exec(), false)); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", @@ -2701,6 +2645,14 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = &[ "GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", @@ -2709,16 +2661,8 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2737,6 +2681,9 @@ fn parallelization_limit_with_filter() -> Result<()> { ))); let plan_csv = limit_exec(filter_exec(sort_exec(sort_key, csv_exec(), false))); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "GlobalLimitExec: skip=0, fetch=100", " CoalescePartitionsExec", @@ -2749,6 +2696,14 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = &[ "GlobalLimitExec: skip=0, fetch=100", " CoalescePartitionsExec", @@ -2761,16 +2716,8 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2785,6 +2732,9 @@ fn parallelization_ignores_limit() -> Result<()> { let plan_csv = aggregate_exec_with_alias(limit_exec(filter_exec(limit_exec(csv_exec()))), alias); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", @@ -2801,6 +2751,14 @@ fn parallelization_ignores_limit() -> Result<()> { " LocalLimitExec: fetch=100", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", @@ -2817,16 +2775,8 @@ fn parallelization_ignores_limit() -> Result<()> { " LocalLimitExec: fetch=100", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2836,6 +2786,9 @@ fn parallelization_union_inputs() -> Result<()> { let plan_parquet = union_exec(vec![parquet_exec(); 5]); let plan_csv = union_exec(vec![csv_exec(); 5]); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "UnionExec", // Union doesn't benefit from input partitioning - no parallelism @@ -2845,6 +2798,14 @@ fn parallelization_union_inputs() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = &[ "UnionExec", // Union doesn't benefit from input partitioning - no parallelism @@ -2854,16 +2815,8 @@ fn parallelization_union_inputs() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2883,23 +2836,28 @@ fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let plan_csv = sort_preserving_merge_exec(sort_key.clone(), csv_exec_with_sort(vec![sort_key])); + let test_config = TestConfig::default(); + + // Expected Outcome: // parallelization is not beneficial for SortPreservingMerge + + // Test: with parquet let expected_parquet = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2918,30 +2876,61 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { let plan_parquet = sort_preserving_merge_exec(sort_key.clone(), input_parquet); let plan_csv = sort_preserving_merge_exec(sort_key, input_csv); + let test_config = TestConfig::default(); + + // Expected Outcome: // should not repartition (union doesn't benefit from increased parallelism) // should not sort (as the data was already sorted) + + // Test: with parquet let expected_parquet = &[ "SortPreservingMergeExec: [c@2 ASC]", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + let expected_parquet_first_sort_enforcement = &[ + // no SPM + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + // has coalesce + " CoalescePartitionsExec", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + ]; + test_config.run( + expected_parquet_first_sort_enforcement, + plan_parquet, + &SORT_DISTRIB_DISTRIB, + )?; + + // Test: with csv let expected_csv = &[ "SortPreservingMergeExec: [c@2 ASC]", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + let expected_csv_first_sort_enforcement = &[ + // no SPM + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + // has coalesce + " CoalescePartitionsExec", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + ]; + test_config.run( + expected_csv_first_sort_enforcement, + plan_csv.clone(), + &SORT_DISTRIB_DISTRIB, + )?; Ok(()) } @@ -2962,25 +2951,30 @@ fn parallelization_does_not_benefit() -> Result<()> { let plan_csv = sort_required_exec_with_req(csv_exec_with_sort(vec![sort_key.clone()]), sort_key); + let test_config = TestConfig::default(); + + // Expected Outcome: // no parallelization, because SortRequiredExec doesn't benefit from increased parallelism + + // Test: with parquet let expected_parquet = &[ "SortRequiredExec: [c@2 ASC]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; + + // Test: with csv let expected_csv = &[ "SortRequiredExec: [c@2 ASC]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3014,16 +3008,19 @@ fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> ]; plans_matches_expected!(expected, &plan_parquet); + // Expected Outcome: // data should not be repartitioned / resorted let expected_parquet = &[ "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected_parquet, - plan_parquet, - &TestConfig::new(DoFirst::Distribution) - ); + plan_parquet.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3057,16 +3054,15 @@ fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> { ]; plans_matches_expected!(expected, &plan_csv); + // Expected Outcome: // data should not be repartitioned / resorted let expected_csv = &[ "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DoFirst::Distribution) - ); + let test_config = TestConfig::default(); + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3090,12 +3086,10 @@ fn remove_redundant_roundrobins() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3111,6 +3105,10 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + + // Expected Outcome: // Original plan expects its output to be ordered by c@2 ASC. // This is still satisfied since, after filter that column is constant. let expected = &[ @@ -3119,16 +3117,9 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() - ); - assert_optimized!( - expected, - physical_plan, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() - ); + + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3144,22 +3135,17 @@ fn preserve_ordering_through_repartition() -> Result<()> { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + let expected = &[ "SortPreservingMergeExec: [d@3 ASC]", " FilterExec: c@2 = 0", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() - ); - assert_optimized!( - expected, - physical_plan, - &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() - ); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3174,6 +3160,9 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + let test_config = TestConfig::default(); + + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -3181,21 +3170,21 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - - let expected = &[ + // Test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " FilterExec: c@2 = 0", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + test_config.run( + expected_first_sort_enforcement, + physical_plan, + &SORT_DISTRIB_DISTRIB, + )?; Ok(()) } @@ -3218,12 +3207,9 @@ fn no_need_for_sort_after_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3243,6 +3229,9 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { }]); let physical_plan = sort_preserving_merge_exec(sort_req, filter_exec(input)); + let test_config = TestConfig::default(); + + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -3250,14 +3239,10 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - - let expected = &[ + // Test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -3265,7 +3250,11 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + test_config.run( + expected_first_sort_enforcement, + physical_plan, + &SORT_DISTRIB_DISTRIB, + )?; Ok(()) } @@ -3285,12 +3274,9 @@ fn do_not_preserve_ordering_through_repartition3() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3380,22 +3366,17 @@ fn do_not_add_unnecessary_hash() -> Result<()> { let input = parquet_exec_with_sort(vec![sort_key]); let physical_plan = aggregate_exec_with_alias(input, alias); + // TestConfig: + // Make sure target partition number is 1. In this case hash repartition is unnecessary. + let test_config = TestConfig::default().with_query_execution_partitions(1); + let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - // Make sure target partition number is 1. In this case hash repartition is unnecessary - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1) - ); - assert_optimized!( - expected, - physical_plan, - &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1) - ); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3412,6 +3393,10 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { let aggregate = aggregate_exec_with_alias(input, alias.clone()); let physical_plan = aggregate_exec_with_alias(aggregate, alias); + // TestConfig: + // Make sure target partition number is larger than 2 (e.g partition number at the source). + let test_config = TestConfig::default().with_query_execution_partitions(4); + let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", // Since hash requirements of this operator is satisfied. There shouldn't be @@ -3423,17 +3408,8 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - // Make sure target partition number is larger than 2 (e.g partition number at the source). - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4) - ); - assert_optimized!( - expected, - physical_plan, - &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4) - ); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3450,12 +3426,10 @@ fn optimize_away_unnecessary_repartition() -> Result<()> { let expected = &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + + let test_config = TestConfig::default(); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3481,12 +3455,9 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DoFirst::Distribution) - ); - assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); + let test_config = TestConfig::default(); + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) }