|
17 | 17 |
|
18 | 18 | use std::collections::HashMap;
|
19 | 19 | use std::env;
|
| 20 | +use std::num::NonZeroUsize; |
20 | 21 | use std::path::Path;
|
21 | 22 | use std::process::ExitCode;
|
22 | 23 | use std::sync::{Arc, LazyLock};
|
23 | 24 |
|
24 | 25 | use datafusion::error::{DataFusionError, Result};
|
25 | 26 | use datafusion::execution::context::SessionConfig;
|
26 |
| -use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool}; |
| 27 | +use datafusion::execution::memory_pool::{ |
| 28 | + FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, |
| 29 | +}; |
27 | 30 | use datafusion::execution::runtime_env::RuntimeEnvBuilder;
|
28 | 31 | use datafusion::execution::DiskManager;
|
29 | 32 | use datafusion::prelude::SessionContext;
|
@@ -118,6 +121,13 @@ struct Args {
|
118 | 121 | )]
|
119 | 122 | mem_pool_type: PoolType,
|
120 | 123 |
|
| 124 | + #[clap( |
| 125 | + long, |
| 126 | + help = "The number of top memory consumers to display when query fails due to memory exhaustion. If you don't want to track MemoryConsumers, set this value to 0", |
| 127 | + default_value = "3" |
| 128 | + )] |
| 129 | + top_memory_consumers: usize, |
| 130 | + |
121 | 131 | #[clap(
|
122 | 132 | long,
|
123 | 133 | help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
|
@@ -169,9 +179,22 @@ async fn main_inner() -> Result<()> {
|
169 | 179 | if let Some(memory_limit) = args.memory_limit {
|
170 | 180 | // set memory pool type
|
171 | 181 | let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
|
172 |
| - PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)), |
173 |
| - PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)), |
| 182 | + PoolType::Fair if args.top_memory_consumers == 0 => { |
| 183 | + Arc::new(FairSpillPool::new(memory_limit)) |
| 184 | + } |
| 185 | + PoolType::Fair => Arc::new(TrackConsumersPool::new( |
| 186 | + FairSpillPool::new(memory_limit), |
| 187 | + NonZeroUsize::new(args.top_memory_consumers).unwrap(), |
| 188 | + )), |
| 189 | + PoolType::Greedy if args.top_memory_consumers == 0 => { |
| 190 | + Arc::new(GreedyMemoryPool::new(memory_limit)) |
| 191 | + } |
| 192 | + PoolType::Greedy => Arc::new(TrackConsumersPool::new( |
| 193 | + GreedyMemoryPool::new(memory_limit), |
| 194 | + NonZeroUsize::new(args.top_memory_consumers).unwrap(), |
| 195 | + )), |
174 | 196 | };
|
| 197 | + |
175 | 198 | rt_builder = rt_builder.with_memory_pool(pool)
|
176 | 199 | }
|
177 | 200 |
|
|
0 commit comments