Skip to content

Commit

Permalink
add the test case with years and days as one partition
Browse files Browse the repository at this point in the history
  • Loading branch information
hpal committed Feb 27, 2024
1 parent c5120fa commit dceea13
Showing 1 changed file with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,62 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}

test("SPARK-47094: Support compatible buckets with different partitionKeys and one joinKey ") {
val table1 = "tab1e1"
val table2 = "table2"

Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach {
case (table1buckets, table2buckets) =>
catalog.clearTables()

val partition1 = Array(Expressions.years("ts"),
bucket(table1buckets, "id"))
val partition2 = Array(Expressions.days("ts"),
bucket(table2buckets, "id"))

createTable(table1, schema, partition1)

sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
"(1, 'aa', cast('2020-01-01' as timestamp)), " +
"(2, 'bb', cast('2020-01-02' as timestamp)), " +
"(3, 'cc', cast('2020-01-03' as timestamp))")

createTable(table2, schema, partition2)

sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
"(3, 'aa', cast('2020-01-01' as timestamp)), " +
"(5, 'bb', cast('2020-01-02' as timestamp)), " +
"(6, 'cc', cast('2020-01-03' as timestamp))")

withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
val df = sql(
s"""
|${selectWithMergeJoinHint("t1", "t2")}
|t1.id, t2.id, t1.ts, t2.ts, t1.data, t2.data
|FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
|ON t1.id = t2.id
|ORDER BY t1.ts
|""".stripMargin)

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.isEmpty, "SPJ should be triggered")

val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
partitions.length)

val expectedBuckets = Math.min(2, 4)

assert(scans == Seq(expectedBuckets, expectedBuckets))
// Not coming till point
}
}
}

test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
val table1 = "tab1e1"
val table2 = "table2"
Expand Down

0 comments on commit dceea13

Please sign in to comment.