Skip to content

Commit 7bdcb22

Browse files
authored
MINOR: Refactor & cleanup for RemoteIndexCache (#13936)
- Add new unit tests - Change the on-disk filename from <offset>_<uuid>_.<indexSuffix> to <offset>_<uuid>.<indexSuffix> i.e. remove trailing underscore after - Fix a small bug where we were parsing offset as Int when reading the file name from disk. Offset is long. - Perform input validation in RemoteLogSegmentMetadata. - Remove an extra loop in cleaner thread. Shutdownable thread already performs looping. Reviewers: Jorge Esteban Quilcate Otoya <[email protected]>, Satish Duggana <[email protected]>
1 parent 14a97fa commit 7bdcb22

File tree

6 files changed

+215
-67
lines changed

6 files changed

+215
-67
lines changed

core/src/main/java/kafka/log/remote/RemoteLogManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
573573
String logFileName = logFile.getName();
574574

575575
logger.info("Copying {} to remote storage.", logFileName);
576-
RemoteLogSegmentId id = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
576+
RemoteLogSegmentId id = RemoteLogSegmentId.generateNew(topicIdPartition);
577577

578578
long endOffset = nextSegmentBaseOffset - 1;
579579
File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);

core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala

Lines changed: 107 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package kafka.log.remote
1818

1919
import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause}
2020
import kafka.log.UnifiedLog
21-
import kafka.log.remote.RemoteIndexCache.{DirName, remoteLogIndexCacheCleanerThread}
21+
import kafka.log.remote.RemoteIndexCache.{DirName, offsetFromRemoteIndexFileName, RemoteLogIndexCacheCleanerThread, remoteLogSegmentIdFromRemoteIndexFileName, remoteOffsetIndexFile, remoteTimeIndexFile, remoteTransactionIndexFile}
2222
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
2323
import kafka.utils.{CoreUtils, Logging, threadsafe}
2424
import org.apache.kafka.common.Uuid
@@ -38,7 +38,60 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
3838
object RemoteIndexCache {
3939
val DirName = "remote-log-index-cache"
4040
val TmpFileSuffix = ".tmp"
41-
val remoteLogIndexCacheCleanerThread = "remote-log-index-cleaner"
41+
val RemoteLogIndexCacheCleanerThread = "remote-log-index-cleaner"
42+
43+
def remoteLogSegmentIdFromRemoteIndexFileName(fileName: String): Uuid = {
44+
val underscoreIndex = fileName.indexOf("_")
45+
val dotIndex = fileName.indexOf(".")
46+
Uuid.fromString(fileName.substring(underscoreIndex + 1, dotIndex))
47+
}
48+
49+
def offsetFromRemoteIndexFileName(fileName: String): Long = {
50+
fileName.substring(0, fileName.indexOf("_")).toLong
51+
}
52+
53+
/**
54+
* Generates prefix for file name for the on-disk representation of remote indexes.
55+
*
56+
* Example of file name prefix is 45_dsdsd where 45 represents the base offset for the segment and
57+
* sdsdsd represents the unique [[RemoteLogSegmentId]]
58+
*
59+
* @param remoteLogSegmentMetadata remote segment for the remote indexes
60+
* @return string which should be used as prefix for on-disk representation of remote indexes
61+
*/
62+
private def generateFileNamePrefixForIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
63+
val startOffset = remoteLogSegmentMetadata.startOffset
64+
val segmentId = remoteLogSegmentMetadata.remoteLogSegmentId().id
65+
// uuid.toString uses URL encoding which is safe for filenames and URLs.
66+
s"${startOffset.toString}_${segmentId.toString}"
67+
}
68+
69+
def remoteOffsetIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
70+
new File(dir, remoteOffsetIndexFileName(remoteLogSegmentMetadata))
71+
}
72+
73+
def remoteOffsetIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
74+
val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
75+
prefix + UnifiedLog.IndexFileSuffix
76+
}
77+
78+
def remoteTimeIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
79+
new File(dir, remoteTimeIndexFileName(remoteLogSegmentMetadata))
80+
}
81+
82+
def remoteTimeIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
83+
val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
84+
prefix + UnifiedLog.TimeIndexFileSuffix
85+
}
86+
87+
def remoteTransactionIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
88+
new File(dir, remoteTransactionIndexFileName(remoteLogSegmentMetadata))
89+
}
90+
91+
def remoteTransactionIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
92+
val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
93+
prefix + UnifiedLog.TxnIndexFileSuffix
94+
}
4295
}
4396

4497
@threadsafe
@@ -104,12 +157,19 @@ class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex
104157
inWriteLock(lock) {
105158
// close is no-op if entry is already marked for cleanup. Mmap resources are released during cleanup.
106159
if (!markedForCleanup) {
107-
Utils.closeQuietly(offsetIndex, "Closing the offset index.")
108-
Utils.closeQuietly(timeIndex, "Closing the time index.")
109-
Utils.closeQuietly(txnIndex, "Closing the transaction index.")
160+
Utils.closeQuietly(offsetIndex, "offset index")
161+
Utils.closeQuietly(timeIndex, "time index")
162+
Utils.closeQuietly(txnIndex, "transaction index")
110163
}
111164
}
112165
}
166+
167+
override def toString: String = {
168+
s"RemoteIndexCacheEntry(" +
169+
s"timeIndex=${timeIndex.file.getName}, " +
170+
s"txnIndex=${txnIndex.file.getName}, " +
171+
s"offsetIndex=${offsetIndex.file.getName})"
172+
}
113173
}
114174

115175
/**
@@ -168,11 +228,11 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
168228
.maximumSize(maxSize)
169229
// removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
170230
// evicted (means removal due to the policy)
171-
.removalListener((_: Uuid, entry: Entry, _: RemovalCause) => {
231+
.removalListener((key: Uuid, entry: Entry, _: RemovalCause) => {
172232
// Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
173233
entry.markForCleanup()
174234
if (!expiredIndexes.offer(entry)) {
175-
error(s"Error while inserting entry $entry into the cleaner queue")
235+
error(s"Error while inserting entry $entry for key $key into the cleaner queue because queue is full.")
176236
}
177237
})
178238
.build[Uuid, Entry]()
@@ -198,25 +258,22 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
198258
})
199259

200260
Files.list(cacheDir.toPath).forEach((path:Path) => {
201-
val pathStr = path.getFileName.toString
202-
val name = pathStr.substring(0, pathStr.lastIndexOf("_") + 1)
203-
204-
// Create entries for each path if all the index files exist.
205-
val firstIndex = name.indexOf('_')
206-
val offset = name.substring(0, firstIndex).toInt
207-
val uuid = Uuid.fromString(name.substring(firstIndex + 1, name.lastIndexOf('_')))
208-
261+
val indexFileName = path.getFileName.toString
262+
val uuid = remoteLogSegmentIdFromRemoteIndexFileName(indexFileName)
209263
// It is safe to update the internalCache non-atomically here since this function is always called by a single
210264
// thread only.
211265
if (!internalCache.asMap().containsKey(uuid)) {
212-
val offsetIndexFile = new File(cacheDir, name + UnifiedLog.IndexFileSuffix)
213-
val timestampIndexFile = new File(cacheDir, name + UnifiedLog.TimeIndexFileSuffix)
214-
val txnIndexFile = new File(cacheDir, name + UnifiedLog.TxnIndexFileSuffix)
266+
val fileNameWithoutDotExtensions = indexFileName.substring(0, indexFileName.indexOf("."))
267+
val offsetIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.IndexFileSuffix)
268+
val timestampIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.TimeIndexFileSuffix)
269+
val txnIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.TxnIndexFileSuffix)
215270

271+
// Create entries for each path if all the index files exist.
216272
if (Files.exists(offsetIndexFile.toPath) &&
217273
Files.exists(timestampIndexFile.toPath) &&
218274
Files.exists(txnIndexFile.toPath)) {
219275

276+
val offset = offsetFromRemoteIndexFileName(indexFileName)
220277
val offsetIndex = new OffsetIndex(offsetIndexFile, offset, Int.MaxValue, false)
221278
offsetIndex.sanityCheck()
222279

@@ -240,26 +297,29 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
240297
init()
241298

242299
// Start cleaner thread that will clean the expired entries
243-
private[remote] var cleanerThread: ShutdownableThread = new ShutdownableThread(remoteLogIndexCacheCleanerThread) {
300+
private[remote] var cleanerThread: ShutdownableThread = new ShutdownableThread(RemoteLogIndexCacheCleanerThread) {
244301
setDaemon(true)
245302

246303
override def doWork(): Unit = {
304+
var expiredEntryOpt: Option[Entry] = None
247305
try {
248-
while (!isRemoteIndexCacheClosed.get()) {
249-
val entry = expiredIndexes.take()
250-
debug(s"Cleaning up index entry $entry")
251-
entry.cleanup()
252-
}
306+
expiredEntryOpt = Some(expiredIndexes.take())
307+
expiredEntryOpt.foreach( expiredEntry => {
308+
log.debug(s"Cleaning up index entry $expiredEntry")
309+
expiredEntry.cleanup()
310+
})
253311
} catch {
254-
case ex: InterruptedException =>
312+
case ie: InterruptedException =>
255313
// cleaner thread should only be interrupted when cache is being closed, else it's an error
256314
if (!isRemoteIndexCacheClosed.get()) {
257-
error("Cleaner thread received interruption but remote index cache is not closed", ex)
258-
throw ex
315+
log.error("Cleaner thread received interruption but remote index cache is not closed", ie)
316+
// propagate the InterruptedException outside to correctly close the thread.
317+
throw ie
259318
} else {
260-
debug("Cleaner thread was interrupted on cache shutdown")
319+
log.debug("Cleaner thread was interrupted on cache shutdown")
261320
}
262-
case ex: Exception => error("Error occurred while fetching/cleaning up expired entry", ex)
321+
// do not exit for exceptions other than InterruptedException
322+
case ex: Throwable => log.error(s"Error occurred while cleaning up expired entry $expiredEntryOpt", ex)
263323
}
264324
}
265325
}
@@ -273,16 +333,20 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
273333
}
274334

275335
inReadLock(lock) {
336+
// while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
337+
// check for index close again
338+
if (isRemoteIndexCacheClosed.get()) {
339+
throw new IllegalStateException(s"Unable to fetch index for " +
340+
s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. Index instance is already closed.")
341+
}
342+
276343
val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
277-
internalCache.get(cacheKey, (uuid: Uuid) => {
278-
def loadIndexFile[T](fileName: String,
279-
suffix: String,
344+
internalCache.get(cacheKey, (_: Uuid) => {
345+
def loadIndexFile[T](indexFile: File,
280346
fetchRemoteIndex: RemoteLogSegmentMetadata => InputStream,
281347
readIndex: File => T): T = {
282-
val indexFile = new File(cacheDir, fileName + suffix)
283-
284348
def fetchAndCreateIndex(): T = {
285-
val tmpIndexFile = new File(cacheDir, fileName + suffix + RemoteIndexCache.TmpFileSuffix)
349+
val tmpIndexFile = new File(cacheDir, indexFile.getName + RemoteIndexCache.TmpFileSuffix)
286350

287351
val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
288352
try {
@@ -311,26 +375,26 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
311375
}
312376

313377
val startOffset = remoteLogSegmentMetadata.startOffset()
314-
// uuid.toString uses URL encoding which is safe for filenames and URLs.
315-
val fileName = startOffset.toString + "_" + uuid.toString + "_"
316-
317-
val offsetIndex: OffsetIndex = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix,
378+
val offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata)
379+
val offsetIndex: OffsetIndex = loadIndexFile(offsetIndexFile,
318380
rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET),
319381
file => {
320382
val index = new OffsetIndex(file, startOffset, Int.MaxValue, false)
321383
index.sanityCheck()
322384
index
323385
})
324386

325-
val timeIndex: TimeIndex = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix,
387+
val timeIndexFile = remoteTimeIndexFile(cacheDir, remoteLogSegmentMetadata)
388+
val timeIndex: TimeIndex = loadIndexFile(timeIndexFile,
326389
rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP),
327390
file => {
328391
val index = new TimeIndex(file, startOffset, Int.MaxValue, false)
329392
index.sanityCheck()
330393
index
331394
})
332395

333-
val txnIndex: TransactionIndex = loadIndexFile(fileName, UnifiedLog.TxnIndexFileSuffix,
396+
val txnIndexFile = remoteTransactionIndexFile(cacheDir, remoteLogSegmentMetadata)
397+
val txnIndex: TransactionIndex = loadIndexFile(txnIndexFile,
334398
rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION),
335399
file => {
336400
val index = new TransactionIndex(startOffset, file)
@@ -367,7 +431,7 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
367431
info(s"Close initiated for RemoteIndexCache. Cache stats=${internalCache.stats}. " +
368432
s"Cache entries pending delete=${expiredIndexes.size()}")
369433
// Initiate shutdown for cleaning thread
370-
val shutdownRequired = cleanerThread.initiateShutdown()
434+
val shutdownRequired = cleanerThread.initiateShutdown
371435
// Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk.
372436
internalCache.asMap().forEach((_, entry) => entry.close())
373437
// wait for cleaner thread to shutdown
@@ -379,4 +443,4 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
379443
}
380444
}
381445
}
382-
}
446+
}

0 commit comments

Comments
 (0)