diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index ced1cfd36cb78..08123035c66a3 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -178,12 +178,13 @@ object DatasetManager extends Logging { } // Wipe the data if we need to - if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) { - context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}") + val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined + if (dropTable) { + catalog.dropTable(identifier) } // Alter the table if we need to - if (existingTableOpt.isDefined) { + if (existingTableOpt.isDefined && !dropTable) { val existingSchema = existingTableOpt.get.schema() val targetSchema = if (table.isStreamingTable && !isFullRefresh) { @@ -198,7 +199,7 @@ object DatasetManager extends Logging { } // Create the table if we need to - if (existingTableOpt.isEmpty) { + if (dropTable || existingTableOpt.isEmpty) { catalog.createTable( identifier, new TableInfo.Builder() diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala index b1c6fe79c0e42..1b0b0df19ffbb 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala @@ -435,7 +435,7 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table2 = catalog.loadTable(identifier) assert( table2.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) + .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) ) assert(table2.partitioning().toSeq == Seq(Expressions.identity("x"))) @@ -456,8 +456,8 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table3 = catalog.loadTable(identifier) assert( - table3.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) + table3.columns() sameElements CatalogV2Util + .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) ) assert(table3.partitioning().toSeq == Seq(Expressions.identity("x"))) }