Skip to content

shuangshuangwang/spark-adaptive

Folders and files

NameName
Last commit message
Last commit date

Latest commit

a558a6d · Apr 12, 2018
Nov 23, 2016
Jul 15, 2017
May 7, 2017
Jul 13, 2017
Jul 13, 2017
Jul 18, 2017
Jul 12, 2017
Dec 13, 2017
Aug 5, 2016
Jul 18, 2017
Jul 25, 2017
Jul 18, 2017
Jul 25, 2017
Jul 18, 2017
May 7, 2017
Jul 18, 2017
Jun 4, 2016
Jul 13, 2017
Jul 20, 2017
Jul 18, 2017
Jul 20, 2017
Jul 13, 2017
Jul 24, 2017
Jul 5, 2017
Mar 26, 2018
Jul 18, 2017
Apr 25, 2017
Oct 31, 2014
Jul 4, 2017
Mar 3, 2017
Nov 23, 2016
Jul 5, 2017
Nov 10, 2016
Apr 12, 2018
Jun 18, 2017
Jul 21, 2017
May 18, 2017

Repository files navigation

Spark SQL Adaptive Execution

There are three main features in Adaptive Execution, including auto setting the shuffle partition number, optimizing join strategy at runtime and handling skewed join. These features can be enabled separately. To start with Adaptive Exection on Spark 2.2, please build branch AdaptiveJoin3 and at least set spark.sql.adaptive.enabled to true. To test Adaptive Execution on Spark 2.3, please use branch ae-2.3-07.

An Engilish version design doc is available on google doc. A Chinese version blog is available on CSDN that introduces the features and benchmark results. SPARK-23128 is the Jira for contributing this work to Apache Spark.

Auto Setting The Shuffle Partition Number

Property NameDefaultMeaning
spark.sql.adaptive.enabled false When true, enable adaptive query execution.
spark.sql.adaptive.minNumPostShufflePartitions 1 The minimum number of post-shuffle partitions used in adaptive execution. This can be used to control the minimum parallelism.
spark.sql.adaptive.maxNumPostShufflePartitions 500 The maximum number of post-shuffle partitions used in adaptive execution. This is also used as the initial shuffle partition number so please set it to an reasonable value.
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864 The target post-shuffle input size in bytes of a task. By default is 64 MB.
spark.sql.adaptive.shuffle.targetPostShuffleRowCount 20000000 The target post-shuffle row count of a task. This only takes effect if row count information is collected.

Optimizing Join Strategy at Runtime

Property NameDefaultMeaning
spark.sql.adaptive.join.enabled true When true and spark.sql.adaptive.enabled is enabled, a better join strategy is determined at runtime.
spark.sql.adaptiveBroadcastJoinThreshold equals to spark.sql.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join in adaptive exeuction mode. If not set, it equals to spark.sql.autoBroadcastJoinThreshold.

Handling Skewed Join

Property NameDefaultMeaning
spark.sql.adaptive.skewedJoin.enabled false When true and spark.sql.adaptive.enabled is enabled, a skewed join is automatically handled at runtime.
spark.sql.adaptive.skewedPartitionFactor 10 A partition is considered as a skewed partition if its size is larger than this factor multiple the median partition size and also larger than spark.sql.adaptive.skewedPartitionSizeThreshold, or if its row count is larger than this factor multiple the median row count and also larger than spark.sql.adaptive.skewedPartitionRowCountThreshold.
spark.sql.adaptive.skewedPartitionSizeThreshold 67108864 Configures the minimum size in bytes for a partition that is considered as a skewed partition in adaptive skewed join.
spark.sql.adaptive.skewedPartitionRowCountThreshold 10000000 Configures the minimum row count for a partition that is considered as a skewed partition in adaptive skewed join.
spark.shuffle.statistics.verbose false Collect shuffle statistics in verbose mode, including row counts etc. This is required for handling skewed join.