Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Feb 12, 2025
1 parent 328e3ac commit 553cdb4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters._

Expand All @@ -59,7 +58,7 @@ case class WriteIntoPaimonTable(

// For case that some columns is absent in data, we still allow to write once write.merge-schema is true.
val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
if (!schemaEqualsIgnoreNullability(newTableSchema, dataSchema)) {
if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
val resolve = sparkSession.sessionState.conf.resolver
val cols = newTableSchema.map {
field =>
Expand Down Expand Up @@ -129,11 +128,6 @@ case class WriteIntoPaimonTable(
(dynamicPartitionOverwriteMode, overwritePartition)
}

private def schemaEqualsIgnoreNullability(s1: StructType, s2: StructType): Boolean = {
def ignoreNullable(s: StructType) = StructType(s.fields.map(_.copy(nullable = true)))
ignoreNullable(s1) == ignoreNullable(s2)
}

override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
this.asInstanceOf[WriteIntoPaimonTable]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
import org.apache.spark.sql.internal.connector.PredicateUtils
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.PartitioningUtils
import org.apache.spark.util.{Utils => SparkUtils}

Expand Down Expand Up @@ -121,4 +121,8 @@ object PaimonUtils {
partitionColumnNames: Seq[String]): Unit = {
PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, partitionColumnNames)
}

def sameType(left: DataType, right: DataType): Boolean = {
left.sameType(right)
}
}

0 comments on commit 553cdb4

Please sign in to comment.