From 3b3700dff6942d7319acb8c1c2725a58472d3717 Mon Sep 17 00:00:00 2001 From: loganzhu Date: Sun, 19 Jan 2025 21:53:46 +0800 Subject: [PATCH 1/4] KAFKA-18597: Fix max-buffer-utilization-percent is always 0 --- core/src/main/scala/kafka/log/LogCleaner.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 71747e69dc2af..254720c1173b8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -119,12 +119,12 @@ class LogCleaner(initialConfig: CleanerConfig, * @param f to compute the result * @return the max value (int value) or 0 if there is no cleaner */ - private def maxOverCleanerThreads(f: CleanerThread => Double): Int = - cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt + private def maxOverCleanerThreads(f: CleanerThread => Double): Double = + cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))) /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, - () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100) + () => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt) /* a metric to track the recopy rate of each thread's last cleaning */ metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => { @@ -134,12 +134,12 @@ class LogCleaner(initialConfig: CleanerConfig, }) /* a metric to track the maximum cleaning time for the last cleaning from each thread */ - metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) + metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt) // a metric to track delay between the time when a log is required to be compacted // as determined by max compaction lag and the time of last cleaner run. metricsGroup.newGauge(MaxCompactionDelayMetricsName, - () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000) + () => (maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt) metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount) From 34773413e4c212d0d5deb482e772021f154eb0a4 Mon Sep 17 00:00:00 2001 From: loganzhu Date: Mon, 20 Jan 2025 11:39:40 +0800 Subject: [PATCH 2/4] KAFKA-18597: Fix test issue --- core/src/main/scala/kafka/log/LogCleaner.scala | 6 +++--- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 13 +------------ 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index afdbe8344b0a1..218c2f36340d9 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -116,10 +116,10 @@ class LogCleaner(initialConfig: CleanerConfig, /** * @param f to compute the result - * @return the max value (int value) or 0 if there is no cleaner + * @return the max value or 0 if there is no cleaner */ - private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int = - cleaners.map(f).maxOption.getOrElse(0.0d).toInt + private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double = + cleaners.map(f).maxOption.getOrElse(0.0d) /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index b83a36a4b5ddd..b07bc48755adc 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -2070,18 +2070,7 @@ class LogCleanerTest extends Logging { cleaner3.lastStats.bufferUtilization = 0.65 cleaners += cleaner3 - assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) - - cleaners.clear() - - cleaner1.lastStats.bufferUtilization = 5d - cleaners += cleaner1 - cleaner2.lastStats.bufferUtilization = 6d - cleaners += cleaner2 - cleaner3.lastStats.bufferUtilization = 7d - cleaners += cleaner3 - - assertEquals(7, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) + assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) } private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { From 495df20229cc90d51eb7f170d3d3225e0d2c93b5 Mon Sep 17 00:00:00 2001 From: loganzhu Date: Mon, 20 Jan 2025 15:10:08 +0800 Subject: [PATCH 3/4] KAFKA-18597: Add unit test --- .../src/main/scala/kafka/log/LogCleaner.scala | 3 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 218c2f36340d9..43dd15db9eb56 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -522,7 +522,8 @@ object LogCleaner { } - private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + // Visible for test. + private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent" private val MaxCleanTimeMetricName = "max-clean-time-secs" private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index b07bc48755adc..055f36b7a24c2 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -17,6 +17,7 @@ package kafka.log +import kafka.log.LogCleaner.MaxBufferUtilizationPercentMetricName import kafka.server.KafkaConfig import kafka.utils.{CoreUtils, Logging, Pool, TestUtils} import org.apache.kafka.common.TopicPartition @@ -2073,6 +2074,54 @@ class LogCleanerTest extends Logging { assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) } + @Test + def testMaxBufferUtilizationPercentMetric(): Unit = { + val logCleaner = new LogCleaner( + new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time + ) + + def assertMaxBufferUtilizationPercent(expected: Int): Unit = { + val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, + () => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt) + assertEquals(expected, gauge.value()) + } + + // No CleanerThreads + assertMaxBufferUtilizationPercent(0) + + val cleaners = logCleaner.cleaners + + val cleaner1 = new logCleaner.CleanerThread(1) + cleaner1.lastStats = new CleanerStats(time) + cleaner1.lastStats.bufferUtilization = 0.75 + cleaners += cleaner1 + + val cleaner2 = new logCleaner.CleanerThread(2) + cleaner2.lastStats = new CleanerStats(time) + cleaner2.lastStats.bufferUtilization = 0.85 + cleaners += cleaner2 + + val cleaner3 = new logCleaner.CleanerThread(3) + cleaner3.lastStats = new CleanerStats(time) + cleaner3.lastStats.bufferUtilization = 0.65 + cleaners += cleaner3 + + // expect the gauge value to reflect the maximum bufferUtilization + assertMaxBufferUtilizationPercent(85) + + // Update bufferUtilization and verify the gauge value updates + cleaner1.lastStats.bufferUtilization = 0.9 + assertMaxBufferUtilizationPercent(90) + + // All CleanerThreads have the same bufferUtilization + cleaners.foreach(_.lastStats.bufferUtilization = 0.5) + assertMaxBufferUtilizationPercent(50) + } + private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { for (((key, value), offset) <- keysAndValues.zip(offsetSeq)) yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset From 3d4f8e0af50f74005b65b264f115204035ec6bc4 Mon Sep 17 00:00:00 2001 From: loganzhu Date: Fri, 24 Jan 2025 12:56:29 +0800 Subject: [PATCH 4/4] KAFKA-18597: Add unit tests for additional impacted metrics in LogCleanerTest - Added testMaxBufferUtilizationPercentMetric to validate maximum buffer utilization metric. - Added testMaxCleanTimeMetric to verify the maximum clean time metric. - Added testMaxCompactionDelayMetrics to ensure correctness of maximum compaction delay metric. --- .../src/main/scala/kafka/log/LogCleaner.scala | 4 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 170 +++++++++++++----- 2 files changed, 127 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 43dd15db9eb56..4f5aaa4137557 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -525,8 +525,8 @@ object LogCleaner { // Visible for test. private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent" - private val MaxCleanTimeMetricName = "max-clean-time-secs" - private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" + private[log] val MaxCleanTimeMetricName = "max-clean-time-secs" + private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" private val DeadThreadCountMetricName = "DeadThreadCount" // package private for testing private[log] val MetricNames = Set( diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 055f36b7a24c2..7b22628c382bc 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -17,7 +17,7 @@ package kafka.log -import kafka.log.LogCleaner.MaxBufferUtilizationPercentMetricName +import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName} import kafka.server.KafkaConfig import kafka.utils.{CoreUtils, Logging, Pool, TestUtils} import org.apache.kafka.common.TopicPartition @@ -2047,35 +2047,59 @@ class LogCleanerTest extends Logging { } @Test - def testMaxOverCleanerThreads(): Unit = { - val logCleaner = new LogCleaner(new CleanerConfig(true), + def testMaxBufferUtilizationPercentMetric(): Unit = { + val logCleaner = new LogCleaner( + new CleanerConfig(true), logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), logs = new Pool[TopicPartition, UnifiedLog](), logDirFailureChannel = new LogDirFailureChannel(1), - time = time) + time = time + ) + + def assertMaxBufferUtilizationPercent(expected: Int): Unit = { + val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, + () => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt) + assertEquals(expected, gauge.value()) + } + + try { + // No CleanerThreads + assertMaxBufferUtilizationPercent(0) + + val cleaners = logCleaner.cleaners - val cleaners = logCleaner.cleaners + val cleaner1 = new logCleaner.CleanerThread(1) + cleaner1.lastStats = new CleanerStats(time) + cleaner1.lastStats.bufferUtilization = 0.75 + cleaners += cleaner1 - val cleaner1 = new logCleaner.CleanerThread(1) - cleaner1.lastStats = new CleanerStats(time) - cleaner1.lastStats.bufferUtilization = 0.75 - cleaners += cleaner1 + val cleaner2 = new logCleaner.CleanerThread(2) + cleaner2.lastStats = new CleanerStats(time) + cleaner2.lastStats.bufferUtilization = 0.85 + cleaners += cleaner2 - val cleaner2 = new logCleaner.CleanerThread(2) - cleaner2.lastStats = new CleanerStats(time) - cleaner2.lastStats.bufferUtilization = 0.85 - cleaners += cleaner2 + val cleaner3 = new logCleaner.CleanerThread(3) + cleaner3.lastStats = new CleanerStats(time) + cleaner3.lastStats.bufferUtilization = 0.65 + cleaners += cleaner3 - val cleaner3 = new logCleaner.CleanerThread(3) - cleaner3.lastStats = new CleanerStats(time) - cleaner3.lastStats.bufferUtilization = 0.65 - cleaners += cleaner3 + // expect the gauge value to reflect the maximum bufferUtilization + assertMaxBufferUtilizationPercent(85) - assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)) + // Update bufferUtilization and verify the gauge value updates + cleaner1.lastStats.bufferUtilization = 0.9 + assertMaxBufferUtilizationPercent(90) + + // All CleanerThreads have the same bufferUtilization + cleaners.foreach(_.lastStats.bufferUtilization = 0.5) + assertMaxBufferUtilizationPercent(50) + } finally { + logCleaner.shutdown() + } } @Test - def testMaxBufferUtilizationPercentMetric(): Unit = { + def testMaxCleanTimeMetric(): Unit = { val logCleaner = new LogCleaner( new CleanerConfig(true), logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), @@ -2084,42 +2108,98 @@ class LogCleanerTest extends Logging { time = time ) - def assertMaxBufferUtilizationPercent(expected: Int): Unit = { - val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, - () => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt) + def assertMaxCleanTime(expected: Int): Unit = { + val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName, + () => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt) + assertEquals(expected, gauge.value()) + } + + try { + // No CleanerThreads + assertMaxCleanTime(0) + + val cleaners = logCleaner.cleaners + + val cleaner1 = new logCleaner.CleanerThread(1) + cleaner1.lastStats = new CleanerStats(time) + cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L + cleaners += cleaner1 + + val cleaner2 = new logCleaner.CleanerThread(2) + cleaner2.lastStats = new CleanerStats(time) + cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L + cleaners += cleaner2 + + val cleaner3 = new logCleaner.CleanerThread(3) + cleaner3.lastStats = new CleanerStats(time) + cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L + cleaners += cleaner3 + + // expect the gauge value to reflect the maximum cleanTime + assertMaxCleanTime(3) + + // Update cleanTime and verify the gauge value updates + cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L + assertMaxCleanTime(4) + + // All CleanerThreads have the same cleanTime + cleaners.foreach(cleaner => cleaner.lastStats.endTime = cleaner.lastStats.startTime + 1_500L) + assertMaxCleanTime(1) + } finally { + logCleaner.shutdown() + } + } + + @Test + def testMaxCompactionDelayMetrics(): Unit = { + val logCleaner = new LogCleaner( + new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time + ) + + def assertMaxCompactionDelay(expected: Int): Unit = { + val gauge = logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName, + () => (logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt) assertEquals(expected, gauge.value()) } - // No CleanerThreads - assertMaxBufferUtilizationPercent(0) + try { + // No CleanerThreads + assertMaxCompactionDelay(0) - val cleaners = logCleaner.cleaners + val cleaners = logCleaner.cleaners - val cleaner1 = new logCleaner.CleanerThread(1) - cleaner1.lastStats = new CleanerStats(time) - cleaner1.lastStats.bufferUtilization = 0.75 - cleaners += cleaner1 + val cleaner1 = new logCleaner.CleanerThread(1) + cleaner1.lastStats = new CleanerStats(time) + cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L + cleaners += cleaner1 - val cleaner2 = new logCleaner.CleanerThread(2) - cleaner2.lastStats = new CleanerStats(time) - cleaner2.lastStats.bufferUtilization = 0.85 - cleaners += cleaner2 + val cleaner2 = new logCleaner.CleanerThread(2) + cleaner2.lastStats = new CleanerStats(time) + cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L + cleaners += cleaner2 - val cleaner3 = new logCleaner.CleanerThread(3) - cleaner3.lastStats = new CleanerStats(time) - cleaner3.lastStats.bufferUtilization = 0.65 - cleaners += cleaner3 + val cleaner3 = new logCleaner.CleanerThread(3) + cleaner3.lastStats = new CleanerStats(time) + cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L + cleaners += cleaner3 - // expect the gauge value to reflect the maximum bufferUtilization - assertMaxBufferUtilizationPercent(85) + // expect the gauge value to reflect the maximum CompactionDelay + assertMaxCompactionDelay(3) - // Update bufferUtilization and verify the gauge value updates - cleaner1.lastStats.bufferUtilization = 0.9 - assertMaxBufferUtilizationPercent(90) + // Update CompactionDelay and verify the gauge value updates + cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L + assertMaxCompactionDelay(4) - // All CleanerThreads have the same bufferUtilization - cleaners.foreach(_.lastStats.bufferUtilization = 0.5) - assertMaxBufferUtilizationPercent(50) + // All CleanerThreads have the same CompactionDelay + cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L) + assertMaxCompactionDelay(1) + } finally { + logCleaner.shutdown() + } } private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {