diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 254720c1173b8..218c2f36340d9 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -115,12 +115,11 @@ class LogCleaner(initialConfig: CleanerConfig, private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]() /** - * scala 2.12 does not support maxOption so we handle the empty manually. * @param f to compute the result - * @return the max value (int value) or 0 if there is no cleaner + * @return the max value or 0 if there is no cleaner */ - private def maxOverCleanerThreads(f: CleanerThread => Double): Double = - cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))) + private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double = + cleaners.map(f).maxOption.getOrElse(0.0d) /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index f7746c7a69b94..b07bc48755adc 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -2045,6 +2045,34 @@ class LogCleanerTest extends Logging { } } + @Test + def testMaxOverCleanerThreads(): Unit = { + val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + val cleaners = logCleaner.cleaners + + val cleaner1 = new logCleaner.CleanerThread(1) + cleaner1.lastStats = new CleanerStats(time) + cleaner1.lastStats.bufferUtilization = 0.75 + cleaners += cleaner1 + + val cleaner2 = new logCleaner.CleanerThread(2) + cleaner2.lastStats = new CleanerStats(time) + cleaner2.lastStats.bufferUtilization = 0.85 + cleaners += cleaner2 + + val cleaner3 = new logCleaner.CleanerThread(3) + cleaner3.lastStats = new CleanerStats(time) + cleaner3.lastStats.bufferUtilization = 0.65 + cleaners += cleaner3 + + assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) + } + private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { for (((key, value), offset) <- keysAndValues.zip(offsetSeq)) yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset