Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ The output looks like this:
| .option("redefine-segment-id-map:0", "REDEFINED_FIELD1 => SegmentId1,SegmentId2,...") | Specifies a mapping between redefined field names and segment id values. Each option specifies a mapping for a single segment. The numeric value for each mapping option must be incremented so the option keys are unique. |
| .option("segment-children:0", "COMPANY => EMPLOYEE,DEPARTMENT") | Specifies a mapping between segment redefined fields and their children. Each option specifies a mapping for a single parent field. The numeric value for each mapping option must be incremented so the option keys are unique. If such mapping is specified hierarchical record structure will be automatically reconstructed. This require `redefine-segment-id-map` to be set. |
| .option("enable_indexes", "true") | Turns on indexing of multisegment variable length files (on by default). |
| .option("enable_index_cache", "false") | When true, calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
| .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) |
| .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object CobolParametersParser extends Logging {

// Indexed multisegment file processing
val PARAM_ENABLE_INDEXES = "enable_indexes"
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
Expand Down Expand Up @@ -381,6 +382,7 @@ object CobolParametersParser extends Logging {
fileEndOffset = 0,
generateRecordId = false,
isUsingIndex = false,
isIndexCachingAllowed = false,
inputSplitRecords = None,
inputSplitSizeMB = None,
improveLocality = false,
Expand Down Expand Up @@ -416,6 +418,7 @@ object CobolParametersParser extends Logging {
isRdwPartRecLength = varLenParams.isRdwPartRecLength,
rdwAdjustment = varLenParams.rdwAdjustment,
isIndexGenerationNeeded = varLenParams.isUsingIndex,
isIndexCachingAllowed = varLenParams.isIndexCachingAllowed,
inputSplitRecords = varLenParams.inputSplitRecords,
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
hdfsDefaultBlockSize = defaultBlockSize,
Expand Down Expand Up @@ -502,6 +505,7 @@ object CobolParametersParser extends Logging {
fileEndOffset,
isRecordIdGenerationEnabled,
params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,
params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt),
params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt),
params.getOrElse(PARAM_IMPROVE_LOCALITY, "true").toBoolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ case class ReaderParameters(
isRdwPartRecLength: Boolean = false,
rdwAdjustment: Int = 0,
isIndexGenerationNeeded: Boolean = false,
isIndexCachingAllowed: Boolean = false,
inputSplitRecords: Option[Int] = None,
inputSplitSizeMB: Option[Int] = None,
hdfsDefaultBlockSize: Option[Int] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package za.co.absa.cobrix.cobol.reader.parameters
* @param fileEndOffset A number of bytes to skip at the end of each file
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
* @param isUsingIndex Is indexing input file before processing is requested
* @param isIndexCachingAllowed Is caching of generated index allowed
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
* @param improveLocality Tries to improve locality by extracting preferred locations for variable-length records
Expand All @@ -56,6 +57,7 @@ case class VariableLengthParameters(
fileEndOffset: Int,
generateRecordId: Boolean,
isUsingIndex: Boolean,
isIndexCachingAllowed: Boolean,
inputSplitRecords: Option[Int],
inputSplitSizeMB: Option[Int],
improveLocality: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ object SparkCobolProcessor {
case reader: VarLenReader if reader.isIndexGenerationNeeded && allowIndexes =>
val orderedFiles = CobolRelation.getListFilesWithOrder(listOfFiles, spark.sqlContext, isRecursiveRetrieval = false)
val filesMap = orderedFiles.map(fileWithOrder => (fileWithOrder.order, fileWithOrder.filePath)).toMap
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext)(LocalityParameters(improveLocality = false, optimizeAllocation = false))
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext, readerParameters.isIndexCachingAllowed)(LocalityParameters(improveLocality = false, optimizeAllocation = false))

indexes.flatMap(indexEntry => {
val filePathName = filesMap(indexEntry.fileId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
class CobolRelation(sourceDirs: Seq[String],
cobolReader: Reader,
localityParams: LocalityParameters,
debugIgnoreFileSize: Boolean
)(@transient val sqlContext: SQLContext)
debugIgnoreFileSize: Boolean,
indexCachingAllowed: Boolean)
(@transient val sqlContext: SQLContext)
extends BaseRelation
with Serializable
with TableScan {

private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)

private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams)
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, indexCachingAllowed)(localityParams)

override def schema: StructType = {
cobolReader.getSparkSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ class DefaultSource

val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
CobolParametersValidator.checkSanity(cobolParameters)
val indexCachingAllowed = cobolParameters.variableLengthParams match {
case Some(varLenParams) => varLenParams.isIndexCachingAllowed
case None => false
}

new CobolRelation(cobolParameters.sourcePaths,
buildEitherReader(sqlContext.sparkSession, cobolParameters),
LocalityParameters.extract(cobolParameters),
cobolParameters.debugIgnoreFileSize)(sqlContext)
cobolParameters.debugIgnoreFileSize,
indexCachingAllowed)(sqlContext)
}

/** Writer relation */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils}

import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer

/**
Expand All @@ -45,18 +46,24 @@ import scala.collection.mutable.ArrayBuffer
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
*/
private[cobol] object IndexBuilder extends Logging {
private val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add cache eviction policy and file modification tracking.

The global indexCache has no eviction policy and will grow unboundedly as new files are processed, potentially causing memory exhaustion in long-running Spark applications. Additionally, the cache is keyed solely by file path without tracking file modification times, so if a file is modified, stale cached indexes will be returned.

Consider these improvements:

  1. Add a bounded cache with LRU eviction (e.g., using Guava's CacheBuilder or Caffeine)
  2. Include file modification timestamp or checksum in the cache key
  3. Add a configurable maximum cache size or TTL
  4. Consider adding cache statistics logging

Example with size-bounded cache:

+import com.google.common.cache.{Cache, CacheBuilder}
+import java.util.concurrent.TimeUnit
+
-private val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
+private val indexCache: Cache[String, Array[SparseIndexEntry]] = CacheBuilder.newBuilder()
+  .maximumSize(1000) // configurable
+  .expireAfterAccess(1, TimeUnit.HOURS) // configurable
+  .build()

Then update get/put operations accordingly.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala
around line 49, the global indexCache is an unbounded ConcurrentHashMap which
can grow indefinitely and does not account for file modifications; replace it
with a bounded LRU/TTL cache (e.g., Guava CacheBuilder or Caffeine) configured
via a new configurable max size and optional TTL, and change cache keys to
include file modification timestamp (or checksum) so modified files miss the
cache; update all get/put usages to the new cache API, add configuration hooks
for size/TTL, and optionally log basic cache statistics (hits/evictions) for
monitoring.


def buildIndex(filesList: Array[FileWithOrder],
cobolReader: Reader,
sqlContext: SQLContext)
sqlContext: SQLContext,
cachingAllowed: Boolean)
(localityParams: LocalityParameters): RDD[SparseIndexEntry] = {
val fs = new Path(filesList.head.filePath).getFileSystem(sqlContext.sparkSession.sparkContext.hadoopConfiguration)

cobolReader match {
case reader: VarLenReader if reader.isIndexGenerationNeeded && localityParams.improveLocality && isDataLocalitySupported(fs) =>
logger.info("Building indexes with data locality...")
buildIndexForVarLenReaderWithFullLocality(filesList, reader, sqlContext, localityParams.optimizeAllocation)
case reader: VarLenReader =>
buildIndexForVarLenReader(filesList, reader, sqlContext)
logger.info("Building indexes for variable record length input files...")
buildIndexForVarLenReader(filesList, reader, sqlContext, cachingAllowed)
case _ =>
logger.info("Generating indexes for full files...")
buildIndexForFullFiles(filesList, sqlContext)
}
}
Expand Down Expand Up @@ -112,24 +119,58 @@ private[cobol] object IndexBuilder extends Logging {
*/
private[cobol] def buildIndexForVarLenReader(filesList: Array[FileWithOrder],
reader: VarLenReader,
sqlContext: SQLContext): RDD[SparseIndexEntry] = {
sqlContext: SQLContext,
cachingAllowed: Boolean): RDD[SparseIndexEntry] = {
val conf = sqlContext.sparkContext.hadoopConfiguration
val sconf = new SerializableConfiguration(conf)

if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) {
selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf)
// Splitting between files for which indexes are chached and teh list of files for which indexes are not cached
val cachedFiles = if (cachingAllowed) {
filesList.filter(f => indexCache.containsKey(f.filePath))
} else {
Array.empty[FileWithOrder]
}

val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
val nonCachedFiles = filesList.diff(cachedFiles)

val indexRDD = filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).cache()
// Getting indexes for files for which indexes are not in the cache
val newIndexes = if (nonCachedFiles.length > 0) {
if (reader.getReaderProperties.enableSelfChecks) {
selfCheckForIndexCompatibility(reader, nonCachedFiles.head.filePath, conf)
}

repartitionIndexes(indexRDD)
val filesRDD = sqlContext.sparkContext.parallelize(nonCachedFiles, nonCachedFiles.length)
filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).collect()
} else {
Array.empty[SparseIndexEntry]
}

// Storing new indexes in the cache
if (cachingAllowed && newIndexes.length > 0) {
newIndexes.groupBy(_.fileId).foreach { case (fileId, indexEntries) =>
val filePathOpt = filesList.find(_.order == fileId).map(_.filePath)

filePathOpt.foreach { filePath =>
logger.info(s"Index stored to cache for file: $filePath.")
indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom))
}
}
}

// Getting indexes for files for which indexes are in the cache
val cachedIndexes = cachedFiles.flatMap { f =>
logger.info("Index fetched from cache for file: " + f.filePath)
indexCache.get(f.filePath)
.map(ind => ind.copy(fileId = f.order))
}

// Creating the final RDD with all indexes
createIndexRDD(cachedIndexes ++ newIndexes, sqlContext)
}

/**
Expand Down Expand Up @@ -336,4 +377,13 @@ private[cobol] object IndexBuilder extends Logging {
logger.info(s"Index elements count: $indexCount, number of partitions = $numPartitions")
indexRDD.repartition(numPartitions).cache()
}

private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
val indexCount = indexes.length

val numPartitions = Math.min(indexCount, Constants.maxNumPartitions)
logger.info(s"Index elements count: ${indexes.length}, number of partitions = $numPartitions")

sqlContext.sparkContext.parallelize(indexes, numPartitions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
if (numberOfBytes <= 0) {
new Array[Byte](0)
} else if (actualBytesToRead <=0 || bufferedStream == null || bufferedStream.isClosed) {
logger.info(s"End of stream reached: Requested $numberOfBytes bytes, reached offset $byteIndex.")
close()
new Array[Byte](0)
} else {
Expand All @@ -97,7 +98,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
if (readBytes == numberOfBytes) {
buffer
} else {
logger.warn(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
logger.info(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
close()
if (readBytes == actualBytesToRead) {
buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
testReader,
localityParams = localityParams,
debugIgnoreFileSize = false)(sqlContext)
debugIgnoreFileSize = false,
indexCachingAllowed = false)(sqlContext)
val cobolData: RDD[Row] = relation.parseRecords(testReader, oneRowRDD)

val cobolDataFrame = sqlContext.createDataFrame(cobolData, sparkSchema)
Expand All @@ -88,7 +89,8 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
testReader,
localityParams = localityParams,
debugIgnoreFileSize = false)(sqlContext)
debugIgnoreFileSize = false,
indexCachingAllowed = false)(sqlContext)

val caught = intercept[Exception] {
relation.parseRecords(testReader, oneRowRDD).collect()
Expand All @@ -103,7 +105,8 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
testReader,
localityParams = localityParams,
debugIgnoreFileSize = false)(sqlContext)
debugIgnoreFileSize = false,
indexCachingAllowed = false)(sqlContext)

val caught = intercept[SparkException] {
relation.parseRecords(testReader, oneRowRDD).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val localityParameters = LocalityParameters(improveLocality = true, optimizeAllocation = true)

val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()

assert(index.length == 3)
}
Expand All @@ -86,7 +86,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false)

val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()

assert(index.length == 3)
}
Expand All @@ -104,7 +104,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false)

val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()

assert(index.length == 1)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val reader = new VarLenNestedReader(Seq(copybook), readerParameters)

val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect()
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect()

assert(index.length == 3)
}
Expand All @@ -188,7 +188,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val reader = new VarLenNestedReader(Seq(copybook), readerParameters)

val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect()
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect()

assert(index.length == 2)
}
Expand Down
Loading
Loading