From dceea139a01e571323539cac083ad10dfc4dfec9 Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 26 Feb 2024 18:27:39 -0800 Subject: [PATCH] add the test case with `years` and `days` as one partition --- .../KeyGroupedPartitioningSuite.scala | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 692a898730c8c..5ea96430c58a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1474,6 +1474,62 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("SPARK-47094: Support compatible buckets with different partitionKeys and one joinKey ") { + val table1 = "tab1e1" + val table2 = "table2" + + Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach { + case (table1buckets, table2buckets) => + catalog.clearTables() + + val partition1 = Array(Expressions.years("ts"), + bucket(table1buckets, "id")) + val partition2 = Array(Expressions.days("ts"), + bucket(table2buckets, "id")) + + createTable(table1, schema, partition1) + + sql(s"INSERT INTO testcat.ns.$table1 VALUES " + + "(1, 'aa', cast('2020-01-01' as timestamp)), " + + "(2, 'bb', cast('2020-01-02' as timestamp)), " + + "(3, 'cc', cast('2020-01-03' as timestamp))") + + createTable(table2, schema, partition2) + + sql(s"INSERT INTO testcat.ns.$table2 VALUES " + + "(3, 'aa', cast('2020-01-01' as timestamp)), " + + "(5, 'bb', cast('2020-01-02' as timestamp)), " + + "(6, 'cc', cast('2020-01-03' as timestamp))") + + withSQLConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("t1", "t2")} + |t1.id, t2.id, t1.ts, t2.ts, t1.data, t2.data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.id = t2.id + |ORDER BY t1.ts + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "SPJ should be triggered") + + val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD. + partitions.length) + + val expectedBuckets = Math.min(2, 4) + + assert(scans == Seq(expectedBuckets, expectedBuckets)) + // Not coming till point + } + } + } + test("SPARK-47094: Support compatible buckets with less join keys than partition keys") { val table1 = "tab1e1" val table2 = "table2"