Skip to content

Commit ccf09e4

Browse files
adriangbOmega359
authored andcommitted
Add spilling to RepartitionExec (apache#18014)
Addresses apache#17334 (comment) I ran into this using `datafusion-distributed` which I think makes the issue of partition execution time skew even more likely to happen. As per that issue it can also happen with non-distributed queries, e.g. if one partition's sort spills and others don't. Due to the nature of `ReparitionExec` I don't think we can bound the channels, that could lead to deadlocks. So what I did was at least make queries that would have previously fail continue forward with disk spilling. I did not account for memory usage when reading batches back from disk since DataFusion in general does not generally account for "in-flight" batches. Written with help from Claude --------- Co-authored-by: Bruce Ritchie <[email protected]>
1 parent f15efcd commit ccf09e4

File tree

3 files changed

+562
-67
lines changed

3 files changed

+562
-67
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::sync::{Arc, LazyLock};
2323

2424
#[cfg(feature = "extended_tests")]
2525
mod memory_limit_validation;
26+
mod repartition_mem_limit;
2627
use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray};
2728
use arrow::compute::SortOptions;
2829
use arrow::datatypes::{Int32Type, SchemaRef};
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::array::{ArrayRef, Int32Array, RecordBatch};
21+
use datafusion::{
22+
assert_batches_sorted_eq,
23+
prelude::{SessionConfig, SessionContext},
24+
};
25+
use datafusion_catalog::MemTable;
26+
use datafusion_common::tree_node::{Transformed, TreeNode};
27+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
28+
use datafusion_physical_plan::{repartition::RepartitionExec, ExecutionPlanProperties};
29+
use futures::TryStreamExt;
30+
use itertools::Itertools;
31+
32+
/// End to end test for spilling in RepartitionExec.
33+
/// The idea is to make a real world query with a relatively low memory limit and
34+
/// then drive one partition at a time, simulating dissimilar execution speed in partitions.
35+
/// Just as some examples of real world scenarios where this can happen consider
36+
/// lopsided groups in a group by especially if one partitions spills and others don't,
37+
/// or in distributed systems if one upstream node is slower than others.
38+
#[tokio::test]
39+
async fn test_repartition_memory_limit() {
40+
let runtime = RuntimeEnvBuilder::new()
41+
.with_memory_limit(1024 * 1024, 1.0)
42+
.build()
43+
.unwrap();
44+
let config = SessionConfig::new()
45+
.with_batch_size(32)
46+
.with_target_partitions(2);
47+
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
48+
let batches = vec![RecordBatch::try_from_iter(vec![(
49+
"c1",
50+
Arc::new(Int32Array::from_iter_values((0..10).cycle().take(100_000))) as ArrayRef,
51+
)])
52+
.unwrap()];
53+
let table = Arc::new(MemTable::try_new(batches[0].schema(), vec![batches]).unwrap());
54+
ctx.register_table("t", table).unwrap();
55+
let plan = ctx
56+
.state()
57+
.create_logical_plan("SELECT c1, count(*) as c FROM t GROUP BY c1;")
58+
.await
59+
.unwrap();
60+
let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
61+
assert_eq!(plan.output_partitioning().partition_count(), 2);
62+
// Execute partition 0, this should cause items going into the rest of the partitions to queue up and because
63+
// of the low memory limit should spill to disk.
64+
let batches0 = Arc::clone(&plan)
65+
.execute(0, ctx.task_ctx())
66+
.unwrap()
67+
.try_collect::<Vec<_>>()
68+
.await
69+
.unwrap();
70+
71+
let mut metrics = None;
72+
Arc::clone(&plan)
73+
.transform_down(|node| {
74+
if node.as_any().is::<RepartitionExec>() {
75+
metrics = node.metrics();
76+
}
77+
Ok(Transformed::no(node))
78+
})
79+
.unwrap();
80+
81+
let metrics = metrics.unwrap();
82+
assert!(metrics.spilled_bytes().unwrap() > 0);
83+
assert!(metrics.spilled_rows().unwrap() > 0);
84+
assert!(metrics.spill_count().unwrap() > 0);
85+
86+
// Execute the other partition
87+
let batches1 = Arc::clone(&plan)
88+
.execute(1, ctx.task_ctx())
89+
.unwrap()
90+
.try_collect::<Vec<_>>()
91+
.await
92+
.unwrap();
93+
94+
let all_batches = batches0
95+
.into_iter()
96+
.chain(batches1.into_iter())
97+
.collect_vec();
98+
#[rustfmt::skip]
99+
let expected = &[
100+
"+----+-------+",
101+
"| c1 | c |",
102+
"+----+-------+",
103+
"| 0 | 10000 |",
104+
"| 1 | 10000 |",
105+
"| 2 | 10000 |",
106+
"| 3 | 10000 |",
107+
"| 4 | 10000 |",
108+
"| 5 | 10000 |",
109+
"| 6 | 10000 |",
110+
"| 7 | 10000 |",
111+
"| 8 | 10000 |",
112+
"| 9 | 10000 |",
113+
"+----+-------+",
114+
];
115+
assert_batches_sorted_eq!(expected, &all_batches);
116+
}

0 commit comments

Comments
 (0)