From 08e7e91f304ce562285af0599001fb8c41ca5722 Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Sun, 18 May 2025 03:15:07 +0000 Subject: [PATCH] add top-memory-consumers option in cli add snapshot tests for memory exhaustion --- datafusion-cli/src/main.rs | 29 +++++++++++++-- datafusion-cli/tests/cli_integration.rs | 36 +++++++++++++++++++ .../cli_top_memory_consumers@no_track.snap | 21 +++++++++++ .../cli_top_memory_consumers@top2.snap | 24 +++++++++++++ ...cli_top_memory_consumers@top3_default.snap | 23 ++++++++++++ docs/source/user-guide/cli/usage.md | 3 ++ 6 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap create mode 100644 datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap create mode 100644 datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 16ddf2a6a5f8..091b177d0819 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -17,13 +17,16 @@ use std::collections::HashMap; use std::env; +use std::num::NonZeroUsize; use std::path::Path; use std::process::ExitCode; use std::sync::{Arc, LazyLock}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; -use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool}; +use datafusion::execution::memory_pool::{ + FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, +}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::DiskManager; use datafusion::prelude::SessionContext; @@ -118,6 +121,13 @@ struct Args { )] mem_pool_type: PoolType, + #[clap( + long, + help = "The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0", + default_value = "3" + )] + top_memory_consumers: usize, + #[clap( long, help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]", @@ -169,9 +179,22 @@ async fn main_inner() -> Result<()> { if let Some(memory_limit) = args.memory_limit { // set memory pool type let pool: Arc = match args.mem_pool_type { - PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)), - PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)), + PoolType::Fair if args.top_memory_consumers == 0 => { + Arc::new(FairSpillPool::new(memory_limit)) + } + PoolType::Fair => Arc::new(TrackConsumersPool::new( + FairSpillPool::new(memory_limit), + NonZeroUsize::new(args.top_memory_consumers).unwrap(), + )), + PoolType::Greedy if args.top_memory_consumers == 0 => { + Arc::new(GreedyMemoryPool::new(memory_limit)) + } + PoolType::Greedy => Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(memory_limit), + NonZeroUsize::new(args.top_memory_consumers).unwrap(), + )), }; + rt_builder = rt_builder.with_memory_pool(pool) } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index bdab38026fcf..fb2f08157f67 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -122,6 +122,42 @@ fn test_cli_format<'a>(#[case] format: &'a str) { assert_cmd_snapshot!(cmd); } +#[rstest] +#[case("no_track", ["--top-memory-consumers", "0"])] +#[case("top2", ["--top-memory-consumers", "2"])] +#[case("top3_default", [])] +#[test] +fn test_cli_top_memory_consumers<'a>( + #[case] snapshot_name: &str, + #[case] top_memory_consumers: impl IntoIterator, +) { + let mut settings = make_settings(); + + settings.set_snapshot_suffix(snapshot_name); + + settings.add_filter( + r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B", + "Consumer(can spill: bool) consumed XB", + ); + settings.add_filter( + r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool", + "Error: Failed to allocate ", + ); + settings.add_filter( + r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool", + "Resources exhausted: Failed to allocate", + ); + + let _bound = settings.bind_to_scope(); + + let mut cmd = cli(); + let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;"; + cmd.args(["--memory-limit", "10M", "--command", sql]); + cmd.args(top_memory_consumers); + + assert_cmd_snapshot!(cmd); +} + #[tokio::test] async fn test_cli() { if env::var("TEST_STORAGE_INTEGRATION").is_err() { diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap new file mode 100644 index 000000000000..89b646a531f8 --- /dev/null +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap @@ -0,0 +1,21 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--memory-limit" + - 10M + - "--command" + - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "--top-memory-consumers" + - "0" +--- +success: false +exit_code: 1 +----- stdout ----- +[CLI_VERSION] +Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes +caused by +Resources exhausted: Failed to allocate + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap new file mode 100644 index 000000000000..ed925a6f6461 --- /dev/null +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -0,0 +1,24 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--memory-limit" + - 10M + - "--command" + - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "--top-memory-consumers" + - "2" +--- +success: false +exit_code: 1 +----- stdout ----- +[CLI_VERSION] +Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes +caused by +Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Consumer(can spill: bool) consumed XB, + Consumer(can spill: bool) consumed XB. +Error: Failed to allocate + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap new file mode 100644 index 000000000000..f35e3b117178 --- /dev/null +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -0,0 +1,23 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--memory-limit" + - 10M + - "--command" + - "select * from generate_series(1,500000) as t1(v1) order by v1;" +--- +success: false +exit_code: 1 +----- stdout ----- +[CLI_VERSION] +Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes +caused by +Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Consumer(can spill: bool) consumed XB, + Consumer(can spill: bool) consumed XB, + Consumer(can spill: bool) consumed XB. +Error: Failed to allocate + +----- stderr ----- diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index 68b09d319984..13f0e7cff175 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -57,6 +57,9 @@ OPTIONS: --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' + --top-memory-consumers + The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0 [default: 3] + -d, --disk-limit Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')