Skip to content

Commit 29f23eb

Browse files
authored
Move repartition_file_scans out of enable_round_robin check in EnforceDistribution rule (#8731)
* Cleanup * More * Restore add_roundrobin_on_top * Restore test files * More * Restore * More * More * Make test stable * For review * Add test
1 parent 0208755 commit 29f23eb

File tree

6 files changed

+45
-31
lines changed

6 files changed

+45
-31
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,32 +1197,33 @@ fn ensure_distribution(
11971197
)
11981198
.map(
11991199
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| {
1200-
// Don't need to apply when the returned row count is not greater than 1:
1200+
// Don't need to apply when the returned row count is not greater than batch size
12011201
let num_rows = child.plan.statistics()?.num_rows;
12021202
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) {
12031203
num_rows
12041204
.get_value()
12051205
.map(|value| value > &batch_size)
1206-
.unwrap_or(true)
1206+
.unwrap() // safe to unwrap since is_exact() is true
12071207
} else {
12081208
true
12091209
};
12101210

1211+
// When `repartition_file_scans` is set, attempt to increase
1212+
// parallelism at the source.
1213+
if repartition_file_scans && repartition_beneficial_stats {
1214+
if let Some(new_child) =
1215+
child.plan.repartitioned(target_partitions, config)?
1216+
{
1217+
child.plan = new_child;
1218+
}
1219+
}
1220+
12111221
if enable_round_robin
12121222
// Operator benefits from partitioning (e.g. filter):
12131223
&& (would_benefit && repartition_beneficial_stats)
12141224
// Unless partitioning doesn't increase the partition count, it is not beneficial:
12151225
&& child.plan.output_partitioning().partition_count() < target_partitions
12161226
{
1217-
// When `repartition_file_scans` is set, attempt to increase
1218-
// parallelism at the source.
1219-
if repartition_file_scans {
1220-
if let Some(new_child) =
1221-
child.plan.repartitioned(target_partitions, config)?
1222-
{
1223-
child.plan = new_child;
1224-
}
1225-
}
12261227
// Increase parallelism by adding round-robin repartitioning
12271228
// on top of the operator. Note that we only do this if the
12281229
// partition count is not already equal to the desired partition
@@ -1361,17 +1362,10 @@ impl DistributionContext {
13611362

13621363
fn update_children(mut self) -> Result<Self> {
13631364
for child_context in self.children_nodes.iter_mut() {
1364-
child_context.distribution_connection = match child_context.plan.as_any() {
1365-
plan_any if plan_any.is::<RepartitionExec>() => matches!(
1366-
plan_any
1367-
.downcast_ref::<RepartitionExec>()
1368-
.unwrap()
1369-
.partitioning(),
1370-
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
1371-
),
1372-
plan_any
1373-
if plan_any.is::<SortPreservingMergeExec>()
1374-
|| plan_any.is::<CoalescePartitionsExec>() =>
1365+
child_context.distribution_connection = match &child_context.plan {
1366+
plan if is_repartition(plan)
1367+
|| is_coalesce_partitions(plan)
1368+
|| is_sort_preserving_merge(plan) =>
13751369
{
13761370
true
13771371
}
@@ -3870,14 +3864,14 @@ pub(crate) mod tests {
38703864
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
38713865
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
38723866
// Plan already has two partitions
3873-
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]",
3867+
"ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]",
38743868
];
38753869
let expected_csv = [
38763870
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
38773871
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
38783872
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
38793873
// Plan already has two partitions
3880-
"CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false",
3874+
"CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false",
38813875
];
38823876

38833877
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);

datafusion/sqllogictest/test_files/arrow_typeof.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,4 +375,4 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)');
375375
query T
376376
select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'));
377377
----
378-
LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })
378+
LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })

datafusion/sqllogictest/test_files/repartition_scan.slt

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192
6363
--FilterExec: column1@0 != 42
6464
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
6565

66+
# disable round robin repartitioning
67+
statement ok
68+
set datafusion.optimizer.enable_round_robin_repartition = false;
69+
70+
## Expect to see the scan read the file as "4" groups with even sizes (offsets) again
71+
query TT
72+
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
73+
----
74+
logical_plan
75+
Filter: parquet_table.column1 != Int32(42)
76+
--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
77+
physical_plan
78+
CoalesceBatchesExec: target_batch_size=8192
79+
--FilterExec: column1@0 != 42
80+
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
81+
82+
# enable round robin repartitioning again
83+
statement ok
84+
set datafusion.optimizer.enable_round_robin_repartition = true;
85+
6686
# create a second parquet file
6787
statement ok
6888
COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet'
@@ -147,7 +167,7 @@ WITH HEADER ROW
147167
LOCATION 'test_files/scratch/repartition_scan/csv_table/';
148168

149169
query I
150-
select * from csv_table;
170+
select * from csv_table ORDER BY column1;
151171
----
152172
1
153173
2
@@ -190,7 +210,7 @@ STORED AS json
190210
LOCATION 'test_files/scratch/repartition_scan/json_table/';
191211

192212
query I
193-
select * from "json_table";
213+
select * from "json_table" ORDER BY column1;
194214
----
195215
1
196216
2

datafusion/sqllogictest/test_files/timestamps.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,7 +1862,7 @@ SELECT to_timestamp(null) is null as c1,
18621862
----
18631863
true true true true true true true true true true true true true
18641864

1865-
# verify timestamp output types
1865+
# verify timestamp output types
18661866
query TTT
18671867
SELECT arrow_typeof(to_timestamp(1)), arrow_typeof(to_timestamp(null)), arrow_typeof(to_timestamp('2023-01-10 12:34:56.000'))
18681868
----
@@ -1880,7 +1880,7 @@ SELECT arrow_typeof(to_timestamp(1)) = arrow_typeof(1::timestamp) as c1,
18801880
true true true true true true
18811881

18821882
# known issues. currently overflows (expects default precision to be microsecond instead of nanoseconds. Work pending)
1883-
#verify extreme values
1883+
#verify extreme values
18841884
#query PPPPPPPP
18851885
#SELECT to_timestamp(-62125747200), to_timestamp(1926632005177), -62125747200::timestamp, 1926632005177::timestamp, cast(-62125747200 as timestamp), cast(1926632005177 as timestamp)
18861886
#----

datafusion/sqllogictest/test_files/tpch/q2.slt.part

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ order by
238238
p_partkey
239239
limit 10;
240240
----
241-
9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily
241+
9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily
242242
9508.37 Supplier#000000070 FRANCE 3563 Manufacturer#1 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T
243243
9508.37 Supplier#000000070 FRANCE 17268 Manufacturer#4 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T
244244
9453.01 Supplier#000000802 ROMANIA 10021 Manufacturer#5 ,6HYXb4uaHITmtMBj4Ak57Pd 29-342-882-6463 gular frets. permanently special multipliers believe blithely alongs

datafusion/sqllogictest/test_files/window.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3794,7 +3794,7 @@ select a,
37943794
1 1
37953795
2 1
37963796

3797-
# support scalar value in ORDER BY
3797+
# support scalar value in ORDER BY
37983798
query I
37993799
select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x
38003800
----

0 commit comments

Comments
 (0)