Skip to content

Commit

Permalink
KAFKA-18597: Fix test issue
Browse files Browse the repository at this point in the history
  • Loading branch information
LoganZhuZzz committed Jan 20, 2025
2 parents 3b3700d + 9649902 commit 66a8451
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,11 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()

/**
* scala 2.12 does not support maxOption so we handle the empty manually.
* @param f to compute the result
* @return the max value (int value) or 0 if there is no cleaner
* @return the max value or 0 if there is no cleaner
*/
private def maxOverCleanerThreads(f: CleanerThread => Double): Double =
cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread)))
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
cleaners.map(f).maxOption.getOrElse(0.0d)

/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,34 @@ class LogCleanerTest extends Logging {
}
}

@Test
def testMaxOverCleanerThreads(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3

assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for (((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset
Expand Down

0 comments on commit 66a8451

Please sign in to comment.