Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18597 Fix max-buffer-utilization-percent is always 0 #18627

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ class LogCleaner(initialConfig: CleanerConfig,

/**
* @param f to compute the result
* @return the max value (int value) or 0 if there is no cleaner
* @return the max value or 0 if there is no cleaner
*/
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
cleaners.map(f).maxOption.getOrElse(0.0d).toInt
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
cleaners.map(f).maxOption.getOrElse(0.0d)

/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
() => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit test for other impacted metrics?


/* a metric to track the recopy rate of each thread's last cleaning */
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
Expand All @@ -133,12 +133,12 @@ class LogCleaner(initialConfig: CleanerConfig,
})

/* a metric to track the maximum cleaning time for the last cleaning from each thread */
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)

// a metric to track delay between the time when a log is required to be compacted
// as determined by max compaction lag and the time of last cleaner run.
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
() => (maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)

metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)

Expand Down Expand Up @@ -522,7 +522,8 @@ object LogCleaner {

}

private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
// Visible for test.
private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
private val MaxCleanTimeMetricName = "max-clean-time-secs"
private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
Expand Down
50 changes: 44 additions & 6 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.log

import kafka.log.LogCleaner.MaxBufferUtilizationPercentMetricName
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -2070,18 +2071,55 @@ class LogCleanerTest extends Logging {
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3

assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
}

@Test
def testMaxBufferUtilizationPercentMetric(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this new unit test is similar to testMaxOverCleanerThreads, right? If so, could you please remove testMaxOverCleanerThreads?

val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)

def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
assertEquals(expected, gauge.value())
}

cleaners.clear()
// No CleanerThreads
assertMaxBufferUtilizationPercent(0)

cleaner1.lastStats.bufferUtilization = 5d
val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1
cleaner2.lastStats.bufferUtilization = 6d

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2
cleaner3.lastStats.bufferUtilization = 7d

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3

assertEquals(7, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
// expect the gauge value to reflect the maximum bufferUtilization
assertMaxBufferUtilizationPercent(85)

// Update bufferUtilization and verify the gauge value updates
cleaner1.lastStats.bufferUtilization = 0.9
assertMaxBufferUtilizationPercent(90)

// All CleanerThreads have the same bufferUtilization
cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
assertMaxBufferUtilizationPercent(50)
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
Expand Down
Loading