|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.hive
|
19 | 19 |
|
| 20 | +import java.io.File |
20 | 21 | import java.time.{Duration, Period}
|
21 | 22 | import java.time.temporal.ChronoUnit
|
22 | 23 |
|
| 24 | +import org.apache.hadoop.fs.Path |
| 25 | +import org.apache.parquet.hadoop.ParquetFileReader |
| 26 | + |
23 | 27 | import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
|
| 28 | +import org.apache.spark.sql.catalyst.TableIdentifier |
24 | 29 | import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest}
|
25 | 30 | import org.apache.spark.sql.hive.test.TestHiveSingleton
|
26 | 31 | import org.apache.spark.sql.internal.SQLConf
|
@@ -179,4 +184,41 @@ class HiveParquetSuite extends QueryTest
|
179 | 184 | }
|
180 | 185 | }
|
181 | 186 | }
|
| 187 | + |
| 188 | + test("SPARK-52574: Ensure compression codec is correctly applied in Hive tables and dirs") { |
| 189 | + withSQLConf( |
| 190 | + HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", |
| 191 | + HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false", |
| 192 | + SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) { |
| 193 | + withTable("tbl") { |
| 194 | + sql("CREATE TABLE tbl(id int) STORED AS PARQUET") |
| 195 | + sql("INSERT INTO tbl SELECT id AS part FROM range(10)") |
| 196 | + val tblMata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")) |
| 197 | + checkCompressionCodec(new File(tblMata.storage.locationUri.get)) |
| 198 | + } |
| 199 | + |
| 200 | + withTempPath { dir => |
| 201 | + sql( |
| 202 | + s""" |
| 203 | + |INSERT OVERWRITE LOCAL DIRECTORY '${dir.getCanonicalPath}' |
| 204 | + |STORED AS parquet |
| 205 | + |SELECT id FROM range(10) |
| 206 | + |""".stripMargin) |
| 207 | + checkCompressionCodec(dir) |
| 208 | + } |
| 209 | + } |
| 210 | + |
| 211 | + def checkCompressionCodec(dir: File): Unit = { |
| 212 | + val parquetFiles = dir.listFiles().filter(_.getName.startsWith("part-")) |
| 213 | + assert(parquetFiles.nonEmpty, "No Parquet files found") |
| 214 | + |
| 215 | + val conf = spark.sessionState.newHadoopConf() |
| 216 | + val file = parquetFiles.head |
| 217 | + val footer = ParquetFileReader.readFooter(conf, new Path(file.getAbsolutePath)) |
| 218 | + |
| 219 | + val codec = footer.getBlocks.get(0).getColumns.get(0).getCodec.name() |
| 220 | + assert(codec.equalsIgnoreCase(ParquetCompressionCodec.SNAPPY.lowerCaseName()), |
| 221 | + s"Expected ${ParquetCompressionCodec.SNAPPY.lowerCaseName()} compression but found $codec") |
| 222 | + } |
| 223 | + } |
182 | 224 | }
|
0 commit comments