diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 71747e69dc2af..c07437e68d599 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -115,12 +115,11 @@ class LogCleaner(initialConfig: CleanerConfig, private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]() /** - * scala 2.12 does not support maxOption so we handle the empty manually. * @param f to compute the result * @return the max value (int value) or 0 if there is no cleaner */ - private def maxOverCleanerThreads(f: CleanerThread => Double): Int = - cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt + private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int = + cleaners.map(f).maxOption.getOrElse(0.0d).toInt /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 3e126e45ffeb0..983e87dd4379b 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -532,21 +532,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], def maintainUncleanablePartitions(): Unit = { // Remove deleted partitions from uncleanablePartitions inLock(lock) { - // Note: we don't use retain or filterInPlace method in this function because retain is deprecated in - // scala 2.13 while filterInPlace is not available in scala 2.12. - // Remove deleted partitions - uncleanablePartitions.values.foreach { - partitions => - val partitionsToRemove = partitions.filterNot(logs.contains).toList - partitionsToRemove.foreach { partitions.remove } + uncleanablePartitions.values.foreach { partitions => + partitions.filterInPlace(logs.contains) } // Remove entries with empty partition set. - val logDirsToRemove = uncleanablePartitions.filter { - case (_, partitions) => partitions.isEmpty - }.keys.toList - logDirsToRemove.foreach { uncleanablePartitions.remove } + uncleanablePartitions.filterInPlace { + case (_, partitions) => partitions.nonEmpty + } } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index f7746c7a69b94..b83a36a4b5ddd 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -2045,6 +2045,45 @@ class LogCleanerTest extends Logging { } } + @Test + def testMaxOverCleanerThreads(): Unit = { + val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + val cleaners = logCleaner.cleaners + + val cleaner1 = new logCleaner.CleanerThread(1) + cleaner1.lastStats = new CleanerStats(time) + cleaner1.lastStats.bufferUtilization = 0.75 + cleaners += cleaner1 + + val cleaner2 = new logCleaner.CleanerThread(2) + cleaner2.lastStats = new CleanerStats(time) + cleaner2.lastStats.bufferUtilization = 0.85 + cleaners += cleaner2 + + val cleaner3 = new logCleaner.CleanerThread(3) + cleaner3.lastStats = new CleanerStats(time) + cleaner3.lastStats.bufferUtilization = 0.65 + cleaners += cleaner3 + + assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) + + cleaners.clear() + + cleaner1.lastStats.bufferUtilization = 5d + cleaners += cleaner1 + cleaner2.lastStats.bufferUtilization = 6d + cleaners += cleaner2 + cleaner3.lastStats.bufferUtilization = 7d + cleaners += cleaner3 + + assertEquals(7, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) + } + private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { for (((key, value), offset) <- keysAndValues.zip(offsetSeq)) yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset