Skip to content

Commit ca0e362

Browse files
committed
Use MetadataLogFileIndex to get latest version in AddDateVersionTransformer
1 parent 7524001 commit ca0e362

File tree

4 files changed

+27
-14
lines changed

4 files changed

+27
-14
lines changed

compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ package za.co.absa.hyperdrive.compatibility.api
1717

1818
import org.apache.spark.sql.SparkSession
1919
import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
20+
import org.apache.spark.sql.types.StructType
2021

2122
trait CompatibleSparkUtil {
22-
def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex
23+
def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex
2324
def hasMetadata(spark: SparkSession, destination: String): Boolean
2425
def jsonStringToObject(jsonString: String): Object
2526
def objectToJsonString(obj: Object): Option[String]

compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ package za.co.absa.hyperdrive.compatibility.provider
1717

1818
import org.apache.spark.sql.SparkSession
1919
import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
20+
import org.apache.spark.sql.types.StructType
2021
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil
2122
import za.co.absa.hyperdrive.compatibility.impl.SparkUtil
2223

2324
object CompatibleSparkUtilProvider extends CompatibleSparkUtil {
24-
def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
25-
SparkUtil.createMetadataLogFileIndex(spark, destination)
25+
def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex =
26+
SparkUtil.createMetadataLogFileIndex(spark, destination, userSpecifiedSchema)
2627

2728
def hasMetadata(spark: SparkSession, destination: String): Boolean =
2829
SparkUtil.hasMetadata(spark, destination)

compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ import org.apache.avro.util.internal.JacksonUtils
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
23+
import org.apache.spark.sql.types.StructType
2324
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil
2425

2526
import java.io.ByteArrayOutputStream
2627

2728
object SparkUtil extends CompatibleSparkUtil {
2829
private lazy val objectMapper = new ObjectMapper()
2930

30-
override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
31-
new MetadataLogFileIndex(spark, new Path(destination), Map.empty, None)
31+
override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex =
32+
new MetadataLogFileIndex(spark, new Path(destination), Map.empty, userSpecifiedSchema)
3233

3334
override def hasMetadata(spark: SparkSession, destination: String): Boolean =
3435
FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration, spark.sessionState.conf)

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@
1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion
1717

1818
import org.apache.commons.configuration2.Configuration
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
1921
import org.slf4j.LoggerFactory
2022
import org.apache.spark.sql.functions.{lit, to_date}
23+
import org.apache.spark.sql.types.StructType
2124
import org.apache.spark.sql.{DataFrame, SparkSession}
2225
import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider
2326
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
2427
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow
2528
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter
29+
import org.apache.spark.sql.types.{DateType, StructField}
2630

2731
import java.time.LocalDate
2832
import java.time.format.DateTimeFormatter
33+
import org.apache.spark.sql.types._
2934

3035
private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer {
3136

@@ -44,22 +49,27 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val
4449
if (noCommittedParquetFilesExist(spark)) {
4550
initialVersion
4651
} else {
47-
import spark.implicits._
48-
val df = spark.read.parquet(destination)
49-
val versions = df.select(df(ColumnVersion))
50-
.filter(df(ColumnDate) === lit(reportDate))
51-
.distinct()
52-
.as[Int]
53-
.collect().toList
54-
52+
val versions = getVersions(spark, ColumnDate, ColumnVersion, reportDate)
5553
if (versions.nonEmpty) versions.max + 1 else initialVersion
5654
}
5755
}
5856

5957
private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = {
60-
val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination)
58+
val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, None)
6159
!CompatibleSparkUtilProvider.hasMetadata(spark, destination) || fileCatalog.allFiles().isEmpty
6260
}
61+
62+
private def getVersions(spark: SparkSession, ColumnDate: String, ColumnVersion: String, reportDate: String): Seq[Int] = {
63+
val fileCatalog: MetadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, Some(StructType(Seq(
64+
StructField(ColumnDate, StringType, nullable = true),
65+
StructField(ColumnVersion, IntegerType, nullable = true)
66+
))))
67+
68+
fileCatalog.partitionSpec().partitions.map { partition =>
69+
val row: InternalRow = partition.values
70+
(row.get(0, DateType), row.getInt(1))
71+
}.filter { case (date, _) => date.toString == reportDate }.map { case (_, version) => version }.toList
72+
}
6373
}
6474

6575
object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes {

0 commit comments

Comments
 (0)