diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 537be46a50125..4626ad3fefe31 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -63,6 +63,18 @@
test-jar
test
+
+ org.apache.spark
+ spark-pipelines_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.spark
+ spark-pipelines_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
org.apache.spark
spark-catalyst_${scala.binary.version}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/pipelines/HiveMaterializeTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/pipelines/HiveMaterializeTablesSuite.scala
new file mode 100644
index 0000000000000..948eb3ab22304
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/pipelines/HiveMaterializeTablesSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.pipelines
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
+import org.apache.spark.sql.pipelines.graph.MaterializeTablesSuite
+import org.apache.spark.sql.test.TestSparkSession
+import org.apache.spark.sql.types.{IntegerType, StructType}
+
+class HiveMaterializeTablesSuite extends MaterializeTablesSuite {
+ test("super basic") {
+ val catalogManager = spark.sessionState.catalogManager
+ val catalog = catalogManager.currentCatalog.asInstanceOf[TableCatalog]
+ val identifier = Identifier.of(Array("default"), "test_table")
+ val outputSchema =
+ new StructType().add("a", IntegerType, true, "comment1")
+ catalog.createTable(
+ identifier,
+ new TableInfo.Builder()
+ .withProperties(Map.empty.asJava)
+ .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
+ .withPartitions(Array.empty)
+ .build()
+ )
+
+ catalog.alterTable(identifier, TableChange.updateColumnComment(Array("a"), "comment2"))
+ val table = catalog.loadTable(identifier)
+ assert(table.schema() == new StructType().add("a", IntegerType, true, "comment2"))
+ }
+
+ protected val hiveContext: TestHiveContext = TestHive
+
+ override def createSparkSession: TestSparkSession = TestHive.sparkSession
+
+ override def afterAll(): Unit = {
+ try {
+ hiveContext.reset()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ spark.artifactManager.cleanUpResourcesForTesting()
+ } finally {
+ super.afterEach()
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a394d0b7393cc..c8c4c59422ec3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, WithTestConf}
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD, SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD, WAREHOUSE_PATH}
+import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
@@ -181,7 +182,7 @@ private[hive] class TestHiveSparkSession(
@transient private val existingSharedState: Option[TestHiveSharedState],
@transient private val parentSessionState: Option[SessionState],
private val loadTestTables: Boolean)
- extends SparkSession(sc) with Logging { self =>
+ extends TestSparkSession(sc) with Logging { self =>
def this(sc: SparkContext, loadTestTables: Boolean) = {
this(
diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
index 79c5ef36b0bc4..20e3a4a46cd39 100644
--- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
+++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
@@ -178,12 +178,14 @@ object DatasetManager extends Logging {
}
// Wipe the data if we need to
- if ((isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined) {
- context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
+ val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined
+ if (dropTable) {
+ catalog.dropTable(identifier)
+// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}")
}
// Alter the table if we need to
- if (existingTableOpt.isDefined) {
+ if (existingTableOpt.isDefined && !dropTable) {
val existingSchema = existingTableOpt.get.schema()
val targetSchema = if (table.isStreamingTableOpt.get && !isFullRefresh) {
@@ -198,7 +200,7 @@ object DatasetManager extends Logging {
}
// Create the table if we need to
- if (existingTableOpt.isEmpty) {
+ if (dropTable || existingTableOpt.isEmpty) {
catalog.createTable(
identifier,
new TableInfo.Builder()
diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
index 2587f503222e8..8cf7f613db440 100644
--- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
+++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
@@ -446,8 +446,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest {
val table2 = catalog.loadTable(identifier)
assert(
- table2.columns() sameElements CatalogV2Util
- .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType))
+ table2.columns().toSet == CatalogV2Util
+ .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType))
+ .toSet
)
assert(table2.partitioning().toSeq == Seq(Expressions.identity("x")))
@@ -468,8 +469,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest {
val table3 = catalog.loadTable(identifier)
assert(
- table3.columns() sameElements CatalogV2Util
- .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType))
+ table3.columns().toSet == CatalogV2Util
+ .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType))
+ .toSet
)
assert(table3.partitioning().toSeq == Seq(Expressions.identity("x")))
}