From 553cdb4058ad123aa8d10b40692f8f4a7c13a4f8 Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 12 Feb 2025 11:25:24 +0800 Subject: [PATCH] update --- .../paimon/spark/commands/WriteIntoPaimonTable.scala | 10 ++-------- .../main/scala/org/apache/spark/sql/PaimonUtils.scala | 6 +++++- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index e7ffdf10b885..eae8b4f14658 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -29,11 +29,10 @@ import org.apache.paimon.table.sink.CommitMessage import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.functions.{col, lit} -import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ @@ -59,7 +58,7 @@ case class WriteIntoPaimonTable( // For case that some columns is absent in data, we still allow to write once write.merge-schema is true. val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType()) - if (!schemaEqualsIgnoreNullability(newTableSchema, dataSchema)) { + if (!PaimonUtils.sameType(newTableSchema, dataSchema)) { val resolve = sparkSession.sessionState.conf.resolver val cols = newTableSchema.map { field => @@ -129,11 +128,6 @@ case class WriteIntoPaimonTable( (dynamicPartitionOverwriteMode, overwritePartition) } - private def schemaEqualsIgnoreNullability(s1: StructType, s2: StructType): Boolean = { - def ignoreNullable(s: StructType) = StructType(s.fields.map(_.copy(nullable = true))) - ignoreNullable(s1) == ignoreNullable(s2) - } - override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this.asInstanceOf[WriteIntoPaimonTable] diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala index a1ce25137436..9023bfa64666 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping import org.apache.spark.sql.internal.connector.PredicateUtils import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.PartitioningUtils import org.apache.spark.util.{Utils => SparkUtils} @@ -121,4 +121,8 @@ object PaimonUtils { partitionColumnNames: Seq[String]): Unit = { PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, partitionColumnNames) } + + def sameType(left: DataType, right: DataType): Boolean = { + left.sameType(right) + } }