Skip to content

by default enable skip agg #12558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
<a name="sql.castFloatToDecimal.enabled"></a>spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ class GpuMergeAggregateIterator(
// It's only based on first batch of first pass agg, so it's an estimate
val firstPassReductionRatioEstimate = 1.0 * peek.numRows() / localInputRowsCount.value
if (firstPassReductionRatioEstimate > skipAggPassReductionRatio) {
logDebug("Skipping second and third pass aggregation due to " +
logInfo("Skipping second and third pass aggregation due to " +
"too high reduction ratio in first pass: " +
s"$firstPassReductionRatioEstimate")
// if so, skip any aggregation, return the origin batch directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1601,12 +1601,15 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.createWithDefault(true)

val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio")
.doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " +
.doc("In non-final aggregation stages, if the previous pass has a row retention ratio " +
"greater than this value, the next aggregation pass will be skipped." +
"Setting this to 1 essentially disables this feature.")
"Setting this to 1 essentially disables this feature. For Historical reasons it is " +
"called skipAggPassReductionRatio, actually skipAggPassRetentionRatio would have " +
"been better")
.internal()
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].")
.createWithDefault(1.0)
.createWithDefault(0.85)

val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
Expand Down
Loading