diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 39b22b6b674a6..4095f058f4d5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1538,7 +1538,7 @@ object SQLConf { .createWithDefault(false) val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS = - buildConf("spark.sql.sources.v2.bucketing.allow.enabled") + buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled") .doc("Whether to allow storage-partition join in the case where the partition transforms" + "are compatible but not identical. This config requires both " + s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 2a49679719d92..fde2c7d7bc373 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -65,13 +65,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } private val table: String = "tbl" private val schema = new StructType() - .add("id", IntegerType) - .add("data", StringType) - .add("ts", TimestampType) - private val schema2 = new StructType() - .add("store_id", IntegerType) - .add("dept_id", IntegerType) - .add("data", StringType) + .add("id", IntegerType) + .add("data", StringType) + .add("ts", TimestampType) + private val schema2 = new StructType() + .add("store_id", IntegerType) + .add("dept_id", IntegerType) + .add("data", StringType) test("clustered distribution: output partitioning should be KeyGroupedPartitioning") { val partitions: Array[Transform] = Array(Expressions.years("ts")) @@ -79,9 +79,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { // create a table with 3 partitions, partitioned by `years` transform createTable(table, schema, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + - s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + - s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + - s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + + s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") var df = sql(s"SELECT count(*) FROM testcat.ns.$table GROUP BY ts") val catalystDistribution = physical.ClusteredDistribution( @@ -129,13 +129,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("non-clustered distribution: no V2 catalog") { spark.conf.set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName) val nonFunctionCatalog = spark.sessionState.catalogManager.catalog("testcat2") - .asInstanceOf[InMemoryTableCatalog] + .asInstanceOf[InMemoryTableCatalog] val partitions: Array[Transform] = Array(bucket(32, "ts")) createTable(table, schema, partitions, catalog = nonFunctionCatalog) sql(s"INSERT INTO testcat2.ns.$table VALUES " + - s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + - s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + - s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + + s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") val df = sql(s"SELECT * FROM testcat2.ns.$table") val distribution = physical.UnspecifiedDistribution @@ -153,9 +153,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val partitions: Array[Transform] = Array(bucket(32, "ts")) createTable(table, schema, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + - s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + - s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + - s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + + s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") val df = sql(s"SELECT * FROM testcat.ns.$table") val distribution = physical.UnspecifiedDistribution @@ -168,9 +168,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val partitions: Array[Transform] = Array(bucket(32, "ts")) createTable(table, schema, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + - s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + - s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + - s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + + s"(2, 'ccc', CAST('2020-01-01' AS timestamp))") val df = sql(s"SELECT * FROM testcat.ns.$table") val distribution = physical.ClusteredDistribution( @@ -204,9 +204,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { * `partitioning`. */ private def checkQueryPlan( - df: DataFrame, - distribution: physical.Distribution, - partitioning: physical.Partitioning): Unit = { + df: DataFrame, + distribution: physical.Distribution, + partitioning: physical.Partitioning): Unit = { // check distribution & ordering are correctly populated in logical plan val relation = df.queryExecution.optimizedPlan.collect { case r: DataSourceV2ScanRelation => r @@ -230,10 +230,10 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } private def createTable( - table: String, - schema: StructType, - partitions: Array[Transform], - catalog: InMemoryTableCatalog = catalog): Unit = { + table: String, + schema: StructType, + partitions: Array[Transform], + catalog: InMemoryTableCatalog = catalog): Unit = { catalog.createTable(Identifier.of(Array("ns"), table), schema, partitions, emptyProps, Distributions.unspecified(), Array.empty, None, None, numRowsPerSplit = 1) @@ -241,23 +241,23 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { private val customers: String = "customers" private val customers_schema = new StructType() - .add("customer_name", StringType) - .add("customer_age", IntegerType) - .add("customer_id", LongType) + .add("customer_name", StringType) + .add("customer_age", IntegerType) + .add("customer_id", LongType) private val orders: String = "orders" private val orders_schema = new StructType() - .add("order_amount", DoubleType) - .add("customer_id", LongType) + .add("order_amount", DoubleType) + .add("customer_id", LongType) private def selectWithMergeJoinHint(t1: String, t2: String): String = { s"SELECT /*+ MERGE($t1, $t2) */ " } private def createJoinTestDF( - keys: Seq[(String, String)], - extraColumns: Seq[String] = Nil, - joinType: String = ""): DataFrame = { + keys: Seq[(String, String)], + extraColumns: Seq[String] = Nil, + joinType: String = ""): DataFrame = { val extraColList = if (extraColumns.isEmpty) "" else extraColumns.mkString(", ", ", ", "") sql( s""" @@ -270,24 +270,24 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } private def testWithCustomersAndOrders( - customers_partitions: Array[Transform], - orders_partitions: Array[Transform], - expectedNumOfShuffleExecs: Int): Unit = { + customers_partitions: Array[Transform], + orders_partitions: Array[Transform], + expectedNumOfShuffleExecs: Int): Unit = { createTable(customers, customers_schema, customers_partitions) sql(s"INSERT INTO testcat.ns.$customers VALUES " + - s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)") + s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)") createTable(orders, orders_schema, orders_partitions) sql(s"INSERT INTO testcat.ns.$orders VALUES " + - s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 3)") + s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 3)") val df = sql( s""" - |${selectWithMergeJoinHint("c", "o")} - |customer_name, customer_age, order_amount - |FROM testcat.ns.$customers c JOIN testcat.ns.$orders o - |ON c.customer_id = o.customer_id ORDER BY c.customer_id, order_amount - |""".stripMargin) + |${selectWithMergeJoinHint("c", "o")} + |customer_name, customer_age, order_amount + |FROM testcat.ns.$customers c JOIN testcat.ns.$orders o + |ON c.customer_id = o.customer_id ORDER BY c.customer_id, order_amount + |""".stripMargin) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.length == expectedNumOfShuffleExecs) @@ -334,35 +334,35 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { private val items: String = "items" private val items_schema: StructType = new StructType() - .add("id", LongType) - .add("name", StringType) - .add("price", FloatType) - .add("arrive_time", TimestampType) + .add("id", LongType) + .add("name", StringType) + .add("price", FloatType) + .add("arrive_time", TimestampType) private val purchases: String = "purchases" private val purchases_schema: StructType = new StructType() - .add("item_id", LongType) - .add("price", FloatType) - .add("time", TimestampType) + .add("item_id", LongType) + .add("price", FloatType) + .add("time", TimestampType) test("partitioned join: join with two partition keys and matching & sorted partitions") { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 42.0, cast('2020-01-01' as timestamp)), " + - s"(1, 44.0, cast('2020-01-15' as timestamp)), " + - s"(1, 45.0, cast('2020-01-15' as timestamp)), " + - s"(2, 11.0, cast('2020-01-01' as timestamp)), " + - s"(3, 19.5, cast('2020-02-01' as timestamp))") + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { @@ -381,20 +381,20 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp))") + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(2, 11.0, cast('2020-01-01' as timestamp)), " + - s"(1, 42.0, cast('2020-01-01' as timestamp)), " + - s"(1, 44.0, cast('2020-01-15' as timestamp)), " + - s"(1, 45.0, cast('2020-01-15' as timestamp)), " + - s"(3, 19.5, cast('2020-02-01' as timestamp))") + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { @@ -414,15 +414,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 42.0, cast('2020-01-01' as timestamp)), " + - s"(2, 11.0, cast('2020-01-01' as timestamp))") + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { @@ -432,7 +432,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") } else { assert(shuffles.nonEmpty, "should add shuffle when partition values mismatch, and " + - "pushing down partition values is not enabled") + "pushing down partition values is not enabled") } checkAnswer(df, @@ -442,21 +442,21 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } test("SPARK-41413: partitioned join: partition values from one side are subset of those from " + - "the other side") { + "the other side") { val items_partitions = Array(bucket(4, "id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(4, "item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - "(1, 42.0, cast('2020-01-01' as timestamp)), " + - "(3, 19.5, cast('2020-02-01' as timestamp))") + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { @@ -466,7 +466,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") } else { assert(shuffles.nonEmpty, "should add shuffle when partition values mismatch, and " + - "pushing down partition values is not enabled") + "pushing down partition values is not enabled") } checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) @@ -479,16 +479,16 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - "(1, 42.0, cast('2020-01-01' as timestamp)), " + - "(2, 19.5, cast('2020-02-01' as timestamp)), " + - "(4, 30.0, cast('2020-02-01' as timestamp))") + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(2, 19.5, cast('2020-02-01' as timestamp)), " + + "(4, 30.0, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { @@ -498,7 +498,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") } else { assert(shuffles.nonEmpty, "should add shuffle when partition values mismatch, and " + - "pushing down partition values is not enabled") + "pushing down partition values is not enabled") } checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 19.5))) @@ -510,16 +510,16 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - "(4, 42.0, cast('2020-01-01' as timestamp)), " + - "(5, 19.5, cast('2020-02-01' as timestamp)), " + - "(6, 30.0, cast('2020-02-01' as timestamp))") + "(4, 42.0, cast('2020-01-01' as timestamp)), " + + "(5, 19.5, cast('2020-02-01' as timestamp)), " + + "(6, 30.0, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { @@ -529,7 +529,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") } else { assert(shuffles.nonEmpty, "should add shuffle when partition values mismatch, and " + - "pushing down partition values is not enabled") + "pushing down partition values is not enabled") } checkAnswer(df, Seq.empty) @@ -541,25 +541,25 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 45.0, cast('2020-01-01' as timestamp)), " + - s"(1, 50.0, cast('2020-01-02' as timestamp)), " + - s"(2, 15.0, cast('2020-01-02' as timestamp)), " + - s"(2, 20.0, cast('2020-01-03' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp))") + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + + s"(2, 15.0, cast('2020-01-02' as timestamp)), " + + s"(2, 20.0, cast('2020-01-03' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 5), ("false", 3)).foreach { case (enable, expected) => withSQLConf( - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, - SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not contain any shuffle") @@ -575,32 +575,32 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } test("SPARK-42038: partially clustered: with same partition keys and both sides partially " + - "clustered") { + "clustered") { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 45.0, cast('2020-01-01' as timestamp)), " + - s"(1, 50.0, cast('2020-01-02' as timestamp)), " + - s"(1, 55.0, cast('2020-01-02' as timestamp)), " + - s"(2, 15.0, cast('2020-01-02' as timestamp)), " + - s"(2, 20.0, cast('2020-01-03' as timestamp)), " + - s"(2, 22.0, cast('2020-01-03' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp))") + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + + s"(1, 55.0, cast('2020-01-02' as timestamp)), " + + s"(2, 15.0, cast('2020-01-02' as timestamp)), " + + s"(2, 20.0, cast('2020-01-03' as timestamp)), " + + s"(2, 22.0, cast('2020-01-03' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 7), ("false", 3)).foreach { case (enable, expected) => withSQLConf( - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, - SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not contain any shuffle") @@ -619,35 +619,35 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } test("SPARK-42038: partially clustered: with different partition keys and both sides partially " + - "clustered") { + "clustered") { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + - s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + + s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 45.0, cast('2020-01-01' as timestamp)), " + - s"(1, 50.0, cast('2020-01-02' as timestamp)), " + - s"(1, 55.0, cast('2020-01-02' as timestamp)), " + - s"(2, 15.0, cast('2020-01-02' as timestamp)), " + - s"(2, 20.0, cast('2020-01-03' as timestamp)), " + - s"(2, 25.0, cast('2020-01-03' as timestamp)), " + - s"(2, 30.0, cast('2020-01-03' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp)), " + - s"(5, 30.0, cast('2023-01-01' as timestamp))") + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + + s"(1, 55.0, cast('2020-01-02' as timestamp)), " + + s"(2, 15.0, cast('2020-01-02' as timestamp)), " + + s"(2, 20.0, cast('2020-01-03' as timestamp)), " + + s"(2, 25.0, cast('2020-01-03' as timestamp)), " + + s"(2, 30.0, cast('2020-01-03' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp)), " + + s"(5, 30.0, cast('2023-01-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 10), ("false", 5)).foreach { case (enable, expected) => withSQLConf( - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, - SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { @@ -669,33 +669,33 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } test("SPARK-42038: partially clustered: with different partition keys and missing keys on " + - "left-hand side") { + "left-hand side") { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + - s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + + s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 45.0, cast('2020-01-01' as timestamp)), " + - s"(1, 50.0, cast('2020-01-02' as timestamp)), " + - s"(2, 15.0, cast('2020-01-02' as timestamp)), " + - s"(2, 20.0, cast('2020-01-03' as timestamp)), " + - s"(2, 25.0, cast('2020-01-03' as timestamp)), " + - s"(2, 30.0, cast('2020-01-03' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp)), " + - s"(5, 30.0, cast('2023-01-01' as timestamp))") + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + + s"(2, 15.0, cast('2020-01-02' as timestamp)), " + + s"(2, 20.0, cast('2020-01-03' as timestamp)), " + + s"(2, 25.0, cast('2020-01-03' as timestamp)), " + + s"(2, 30.0, cast('2020-01-03' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp)), " + + s"(5, 30.0, cast('2023-01-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 9), ("false", 5)).foreach { case (enable, expected) => withSQLConf( - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, - SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { @@ -716,30 +716,30 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } test("SPARK-42038: partially clustered: with different partition keys and missing keys on " + - "right-hand side") { + "right-hand side") { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(2, 15.0, cast('2020-01-02' as timestamp)), " + - s"(2, 20.0, cast('2020-01-03' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp)), " + - s"(4, 25.0, cast('2020-02-01' as timestamp)), " + - s"(5, 30.0, cast('2023-01-01' as timestamp))") + s"(2, 15.0, cast('2020-01-02' as timestamp)), " + + s"(2, 20.0, cast('2020-01-03' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp)), " + + s"(4, 25.0, cast('2020-02-01' as timestamp)), " + + s"(5, 30.0, cast('2023-01-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 6), ("false", 5)).foreach { case (enable, expected) => withSQLConf( - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, - SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { @@ -761,19 +761,19 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 15.0, cast('2020-01-02' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 15.0, cast('2020-01-02' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(2, 20.0, cast('2020-01-01' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp)), " + - s"(4, 25.0, cast('2020-02-01' as timestamp)), " + - s"(5, 30.0, cast('2023-01-01' as timestamp))") + s"(2, 20.0, cast('2020-01-01' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp)), " + + s"(4, 25.0, cast('2020-02-01' as timestamp)), " + + s"(5, 30.0, cast('2023-01-01' as timestamp))") // In a left-outer join, and when the left side has larger stats, partially clustered // distribution should kick in and pick the right hand side to replicate partitions. @@ -791,7 +791,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { assert(shuffles.isEmpty, "should not contain any shuffle") val scans = collectScans(df.queryExecution.executedPlan) assert(scans.forall(_.inputRDD.partitions.length == expected), - s"Expected $expected but got ${scans.head.inputRDD.partitions.length}") + s"Expected $expected but got ${scans.head.inputRDD.partitions.length}") } else { assert(shuffles.nonEmpty, "should contain shuffle when not pushing down partition values") @@ -808,20 +808,20 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 45.0, cast('2020-01-01' as timestamp)), " + - s"(2, 15.0, cast('2020-01-01' as timestamp)), " + - s"(2, 20.0, cast('2020-01-01' as timestamp)), " + - s"(3, 20.0, cast('2020-02-01' as timestamp)), " + - s"(4, 25.0, cast('2020-02-01' as timestamp)), " + - s"(5, 30.0, cast('2023-01-01' as timestamp))") + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + + s"(2, 15.0, cast('2020-01-01' as timestamp)), " + + s"(2, 20.0, cast('2020-01-01' as timestamp)), " + + s"(3, 20.0, cast('2020-02-01' as timestamp)), " + + s"(4, 25.0, cast('2020-02-01' as timestamp)), " + + s"(5, 30.0, cast('2023-01-01' as timestamp))") // The left-hand side is picked as the side to replicate partitions based on stats, but since // this is right outer join, partially clustered distribution won't kick in, and Spark should @@ -859,20 +859,20 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-01-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 45.0, cast('2020-01-01' as timestamp)), " + - s"(2, 15.0, cast('2020-01-01' as timestamp)), " + - s"(2, 20.0, cast('2020-01-02' as timestamp)), " + - s"(3, 20.0, cast('2020-01-01' as timestamp)), " + - s"(4, 25.0, cast('2020-01-01' as timestamp)), " + - s"(5, 30.0, cast('2023-01-01' as timestamp))") + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + + s"(2, 15.0, cast('2020-01-01' as timestamp)), " + + s"(2, 20.0, cast('2020-01-02' as timestamp)), " + + s"(3, 20.0, cast('2020-01-01' as timestamp)), " + + s"(4, 25.0, cast('2020-01-01' as timestamp)), " + + s"(5, 30.0, cast('2023-01-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 5), ("false", 5)).foreach { @@ -905,41 +905,41 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("data source partitioning + dynamic partition filtering") { withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") { + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 42.0, cast('2020-01-01' as timestamp)), " + - s"(1, 44.0, cast('2020-01-15' as timestamp)), " + - s"(1, 45.0, cast('2020-01-15' as timestamp)), " + - s"(2, 11.0, cast('2020-01-01' as timestamp)), " + - s"(3, 19.5, cast('2020-02-01' as timestamp))") + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { // number of unique partitions changed after dynamic filtering - the gap should be filled // with empty partitions and the job should still succeed var df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, testcat.ns.$purchases p " + - "WHERE i.id = p.item_id AND i.price > 40.0") + "WHERE i.id = p.item_id AND i.price > 40.0") checkAnswer(df, Seq(Row(131))) // dynamic filtering doesn't change partitioning so storage-partitioned join should kick // in df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, testcat.ns.$purchases p " + - "WHERE i.id = p.item_id AND i.price >= 10.0") + "WHERE i.id = p.item_id AND i.price >= 10.0") val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") checkAnswer(df, Seq(Row(303.5))) @@ -952,48 +952,48 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val items_partitions = Array(identity("id")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + - s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + - s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + - s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + + s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 42.0, cast('2020-01-01' as timestamp)), " + - s"(1, 44.0, cast('2020-01-15' as timestamp)), " + - s"(1, 45.0, cast('2020-01-15' as timestamp)), " + - s"(1, 50.0, cast('2020-01-15' as timestamp)), " + - s"(1, 55.0, cast('2020-01-15' as timestamp)), " + - s"(1, 60.0, cast('2020-01-15' as timestamp)), " + - s"(1, 65.0, cast('2020-01-15' as timestamp)), " + - s"(2, 11.0, cast('2020-01-01' as timestamp)), " + - s"(3, 19.5, cast('2020-02-01' as timestamp)), " + - s"(5, 25.0, cast('2023-01-01' as timestamp)), " + - s"(5, 26.0, cast('2023-01-01' as timestamp)), " + - s"(5, 28.0, cast('2023-01-01' as timestamp)), " + - s"(6, 50.0, cast('2023-02-01' as timestamp)), " + - s"(6, 50.0, cast('2023-02-01' as timestamp))") + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + + s"(1, 50.0, cast('2020-01-15' as timestamp)), " + + s"(1, 55.0, cast('2020-01-15' as timestamp)), " + + s"(1, 60.0, cast('2020-01-15' as timestamp)), " + + s"(1, 65.0, cast('2020-01-15' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp)), " + + s"(5, 25.0, cast('2023-01-01' as timestamp)), " + + s"(5, 26.0, cast('2023-01-01' as timestamp)), " + + s"(5, 28.0, cast('2023-01-01' as timestamp)), " + + s"(6, 50.0, cast('2023-02-01' as timestamp)), " + + s"(6, 50.0, cast('2023-02-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(("true", 15), ("false", 6)).foreach { case (enable, expected) => withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10", - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, - SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { // storage-partitioned join should kick in and fill the missing partitions & splits // after dynamic filtering with empty partitions & splits, respectively. val df = sql(s"SELECT sum(p.price) from " + - s"testcat.ns.$purchases p, testcat.ns.$items i WHERE " + - s"p.item_id = i.id AND p.price < 45.0") + s"testcat.ns.$purchases p, testcat.ns.$items i WHERE " + + s"p.item_id = i.id AND p.price < 45.0") checkAnswer(df, Seq(Row(213.5))) val shuffles = collectShuffles(df.queryExecution.executedPlan) @@ -1179,7 +1179,8 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-44641: duplicated records when SPJ is not triggered") { val items_partitions = Array(bucket(8, "id")) createTable(items, items_schema, items_partitions) - sql(s""" + sql( + s""" INSERT INTO testcat.ns.$items VALUES (1, 'aa', 40.0, cast('2020-01-01' as timestamp)), (1, 'aa', 41.0, cast('2020-01-15' as timestamp)), @@ -1189,7 +1190,8 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val purchases_partitions = Array(bucket(8, "item_id")) createTable(purchases, purchases_schema, purchases_partitions) - sql(s"""INSERT INTO testcat.ns.$purchases VALUES + sql( + s"""INSERT INTO testcat.ns.$purchases VALUES (1, 42.0, cast('2020-01-01' as timestamp)), (1, 44.0, cast('2020-01-15' as timestamp)), (1, 45.0, cast('2020-01-15' as timestamp)), @@ -1201,7 +1203,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> - partiallyClusteredEnabled.toString) { + partiallyClusteredEnabled.toString) { // join keys are not the same as the partition keys, therefore SPJ is not triggered. val df = createJoinTestDF(Seq("arrive_time" -> "time"), extraColumns = Seq("p.item_id")) @@ -1227,29 +1229,29 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } test("SPARK-44647: test join key is subset of cluster key " + - "with push values and partially-clustered") { + "with push values and partially-clustered") { val table1 = "tab1e1" val table2 = "table2" val partition = Array(identity("id"), identity("data")) createTable(table1, schema, partition) sql(s"INSERT INTO testcat.ns.$table1 VALUES " + - "(1, 'aa', cast('2020-01-01' as timestamp)), " + - "(2, 'bb', cast('2020-01-01' as timestamp)), " + - "(2, 'cc', cast('2020-01-01' as timestamp)), " + - "(3, 'dd', cast('2020-01-01' as timestamp)), " + - "(3, 'dd', cast('2020-01-01' as timestamp)), " + - "(3, 'ee', cast('2020-01-01' as timestamp)), " + - "(3, 'ee', cast('2020-01-01' as timestamp))") + "(1, 'aa', cast('2020-01-01' as timestamp)), " + + "(2, 'bb', cast('2020-01-01' as timestamp)), " + + "(2, 'cc', cast('2020-01-01' as timestamp)), " + + "(3, 'dd', cast('2020-01-01' as timestamp)), " + + "(3, 'dd', cast('2020-01-01' as timestamp)), " + + "(3, 'ee', cast('2020-01-01' as timestamp)), " + + "(3, 'ee', cast('2020-01-01' as timestamp))") createTable(table2, schema, partition) sql(s"INSERT INTO testcat.ns.$table2 VALUES " + - "(4, 'zz', cast('2020-01-01' as timestamp)), " + - "(4, 'zz', cast('2020-01-01' as timestamp)), " + - "(3, 'yy', cast('2020-01-01' as timestamp)), " + - "(3, 'yy', cast('2020-01-01' as timestamp)), " + - "(3, 'xx', cast('2020-01-01' as timestamp)), " + - "(3, 'xx', cast('2020-01-01' as timestamp)), " + - "(2, 'ww', cast('2020-01-01' as timestamp))") + "(4, 'zz', cast('2020-01-01' as timestamp)), " + + "(4, 'zz', cast('2020-01-01' as timestamp)), " + + "(3, 'yy', cast('2020-01-01' as timestamp)), " + + "(3, 'yy', cast('2020-01-01' as timestamp)), " + + "(3, 'xx', cast('2020-01-01' as timestamp)), " + + "(3, 'xx', cast('2020-01-01' as timestamp)), " + + "(2, 'ww', cast('2020-01-01' as timestamp))") Seq(true, false).foreach { pushDownValues => Seq(true, false).foreach { partiallyClustered => @@ -1259,16 +1261,16 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> - partiallyClustered.toString, + partiallyClustered.toString, SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString) { + allowJoinKeysSubsetOfPartitionKeys.toString) { val df = sql( s""" - |${selectWithMergeJoinHint("t1", "t2")} - |t1.id AS id, t1.data AS t1data, t2.data AS t2data - |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 - |ON t1.id = t2.id ORDER BY t1.id, t1data, t2data - |""".stripMargin) + |${selectWithMergeJoinHint("t1", "t2")} + |t1.id AS id, t1.data AS t1data, t2.data AS t2data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.id = t2.id ORDER BY t1.id, t1data, t2data + |""".stripMargin) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (allowJoinKeysSubsetOfPartitionKeys) { assert(shuffles.isEmpty, "SPJ should be triggered") @@ -1277,7 +1279,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } val scans = collectScans(df.queryExecution.executedPlan) - .map(_.inputRDD.partitions.length) + .map(_.inputRDD.partitions.length) (allowJoinKeysSubsetOfPartitionKeys, partiallyClustered) match { // SPJ and partially-clustered @@ -1377,14 +1379,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(5, 4, 'bo'), " + "(5, 5, 'bp')" - // additional unmatched partitions to test push down - val finalStr = if (tab == table1) { - insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')" - } else { - insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')" - } + // additional unmatched partitions to test push down + val finalStr = if (tab == table1) { + insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')" + } else { + insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')" + } - sql(finalStr) + sql(finalStr) } Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => @@ -1552,11 +1554,67 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Row(6, 1, 5, 5, "ag", "07"), )) } - } + } + } + + test("SPARK-47094: Support compatible buckets with different partitionKeys and one joinKey ") { + val table1 = "tab1e1" + val table2 = "table2" + + Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach { + case (table1buckets, table2buckets) => + catalog.clearTables() + + val partition1 = Array(Expressions.years("ts"), + bucket(table1buckets, "id")) + val partition2 = Array(Expressions.days("ts"), + bucket(table2buckets, "id")) + + createTable(table1, schema, partition1) + + sql(s"INSERT INTO testcat.ns.$table1 VALUES " + + "(1, 'aa', cast('2020-01-01' as timestamp)), " + + "(2, 'bb', cast('2020-01-02' as timestamp)), " + + "(3, 'cc', cast('2020-01-03' as timestamp))") + + createTable(table2, schema, partition2) + + sql(s"INSERT INTO testcat.ns.$table2 VALUES " + + "(3, 'aa', cast('2020-01-01' as timestamp)), " + + "(5, 'bb', cast('2020-01-02' as timestamp)), " + + "(6, 'cc', cast('2020-01-03' as timestamp))") + + withSQLConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("t1", "t2")} + |t1.id, t2.id, t1.ts, t2.ts, t1.data, t2.data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.id = t2.id + |ORDER BY t1.ts + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "SPJ should be triggered") + + val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD. + partitions.length) + + val expectedBuckets = Math.min(2, 4) + + assert(scans == Seq(expectedBuckets, expectedBuckets)) + + } + } } - test("SPARK-47094: Compatible buckets does not support SPJ with " + - "push-down values or partially-clustered") { + test("SPARK-47094: Compatible buckets does not support SPJ " + + "with push-down values or partially-clustered") { val table1 = "tab1e1" val table2 = "table2"