Skip to content

Commit d81d20b

Browse files
colinmjjGitHub Enterprise
authored andcommitted
[HADP-54510] Validate the folder for dynamic partition to avoid data quality issue (apache#476)
1 parent 0907392 commit d81d20b

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,17 @@ class HadoopMapReduceCommitProtocol(
293293
if (dynamicPartitionOverwrite) {
294294
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
295295
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
296+
// to avoid partial commit because of missed expected partition,
297+
// we need to check if all required partitions exist.
298+
// but the process of validation and rename are not atomic,
299+
// it is possible that partial commit will happen
300+
for (part <- partitionPaths) {
301+
if (!fs.exists(new Path(stagingDir, part))) {
302+
throw new IOException(s"Failed to validate all required partitions exist when " +
303+
s"committing files staged for overwriting dynamic partitions, " +
304+
s"missed partition: $part")
305+
}
306+
}
296307
for (part <- partitionPaths) {
297308
val finalPartPath = new Path(path, part)
298309
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2628,6 +2628,27 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
26282628
}
26292629
}
26302630

2631+
test("HADP-54510: Validate the folder for dynamic partition to avoid data quality issue") {
2632+
withSQLConf(
2633+
"fs.file.impl" -> classOf[ExistsFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName,
2634+
"fs.file.impl.disable.cache" -> "true",
2635+
SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
2636+
2637+
withTable("t") {
2638+
sql(
2639+
"""
2640+
|create table t(i int, part1 int, part2 int) using parquet
2641+
|partitioned by (part1, part2)
2642+
""".stripMargin)
2643+
2644+
val e = intercept[IOException] {
2645+
sql(s"insert overwrite table t partition(part1, part2) values (1, 1, 1)")
2646+
}
2647+
assert(e.getMessage.contains("Failed to validate all required partitions"))
2648+
}
2649+
}
2650+
}
2651+
26312652
test("SPARK-36980: Insert support query with CTE") {
26322653
withTable("t") {
26332654
sql("CREATE TABLE t(i int, part1 int, part2 int) using parquet")
@@ -2851,3 +2872,13 @@ class RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLoca
28512872
path.toString.contains(".spark-staging-")
28522873
}
28532874
}
2875+
2876+
class ExistsFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem {
2877+
override def exists(path: Path): Boolean = {
2878+
!isSparkStagingDir(path) && super.exists(path)
2879+
}
2880+
2881+
private def isSparkStagingDir(path: Path): Boolean = {
2882+
path.toString.contains(".spark-staging-")
2883+
}
2884+
}

0 commit comments

Comments
 (0)