From f9bf404ce63064d8c94d1bbfc1c680399c7c3149 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Thu, 6 Feb 2025 14:22:01 +0800 Subject: [PATCH] [bug][spark] fix ambiguous __paimon_file_path when merging from paimon table --- .../spark/commands/MergeIntoPaimonTable.scala | 11 ++++++---- .../paimon/spark/commands/PaimonCommand.scala | 5 +++-- .../spark/sql/MergeIntoTableTestBase.scala | 22 +++++++++++++++++++ 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 52e704172fc8..3df95917ab65 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -152,13 +152,16 @@ case class MergeIntoPaimonTable( } if (hasUpdate(matchedActions)) { touchedFilePathsSet ++= findTouchedFiles( - targetDS.join(sourceDS, toColumn(mergeCondition), "inner"), - sparkSession) + targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), "inner"), + sparkSession, + "_left." + FILE_PATH_COLUMN + ) } if (hasUpdate(notMatchedBySourceActions)) { touchedFilePathsSet ++= findTouchedFiles( - targetDS.join(sourceDS, toColumn(mergeCondition), "left_anti"), - sparkSession) + targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), "left_anti"), + sparkSession, + "_left." + FILE_PATH_COLUMN) } val targetFilePaths: Array[String] = findTouchedFiles(targetDS, sparkSession) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 28ac1623fb59..3585f74b82a2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -126,10 +126,11 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon protected def findTouchedFiles( dataset: Dataset[Row], - sparkSession: SparkSession): Array[String] = { + sparkSession: SparkSession, + identifier: String = FILE_PATH_COLUMN): Array[String] = { import sparkSession.implicits._ dataset - .select(FILE_PATH_COLUMN) + .select(identifier) .distinct() .as[String] .collect() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index bcd84fdc11da..291945a055f7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -48,6 +48,28 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab } } + test(s"Paimon MergeInto: two paimon table") { + withTable("source", "target") { + createTable("target", "a INT, b INT, c STRING", Seq("a")) + createTable("source", "a INT, b INT, c STRING", Seq("a")) + + spark.sql("INSERT INTO source values (1, 100, 'c11'), (3, 300, 'c33')") + spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") + + spark.sql(s""" + |MERGE INTO target + |USING source + |ON target.a = source.a + |WHEN MATCHED THEN + |UPDATE SET a = source.a, b = source.b, c = source.c + |""".stripMargin) + + checkAnswer( + spark.sql("SELECT * FROM target ORDER BY a, b"), + Row(1, 100, "c11") :: Row(2, 20, "c2") :: Nil) + } + } + test(s"Paimon MergeInto: only delete") { withTable("source", "target") {