Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18597 Fix max-buffer-utilization-percent is always 0 #18627

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ class LogCleaner(initialConfig: CleanerConfig,

/**
* @param f to compute the result
* @return the max value (int value) or 0 if there is no cleaner
* @return the max value or 0 if there is no cleaner
*/
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
cleaners.map(f).maxOption.getOrElse(0.0d).toInt
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
cleaners.map(f).maxOption.getOrElse(0.0d)

/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
() => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit test for other impacted metrics?


/* a metric to track the recopy rate of each thread's last cleaning */
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
Expand All @@ -133,12 +133,12 @@ class LogCleaner(initialConfig: CleanerConfig,
})

/* a metric to track the maximum cleaning time for the last cleaning from each thread */
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)

// a metric to track delay between the time when a log is required to be compacted
// as determined by max compaction lag and the time of last cleaner run.
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
() => (maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)

metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)

Expand Down Expand Up @@ -522,10 +522,11 @@ object LogCleaner {

}

private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
// Visible for test.
private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
private val MaxCleanTimeMetricName = "max-clean-time-secs"
private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
private[log] val MaxCleanTimeMetricName = "max-clean-time-secs"
private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
private val DeadThreadCountMetricName = "DeadThreadCount"
// package private for testing
private[log] val MetricNames = Set(
Expand Down
168 changes: 143 additions & 25 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.log

import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -2046,42 +2047,159 @@ class LogCleanerTest extends Logging {
}

@Test
def testMaxOverCleanerThreads(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
def testMaxBufferUtilizationPercentMetric(): Unit = {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
time = time
)

def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
assertEquals(expected, gauge.value())
}

try {
// No CleanerThreads
assertMaxBufferUtilizationPercent(0)

val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3

// expect the gauge value to reflect the maximum bufferUtilization
assertMaxBufferUtilizationPercent(85)

// Update bufferUtilization and verify the gauge value updates
cleaner1.lastStats.bufferUtilization = 0.9
assertMaxBufferUtilizationPercent(90)

// All CleanerThreads have the same bufferUtilization
cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
assertMaxBufferUtilizationPercent(50)
} finally {
logCleaner.shutdown()
}
}

@Test
def testMaxCleanTimeMetric(): Unit = {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)

def assertMaxCleanTime(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName,
() => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
assertEquals(expected, gauge.value())
}

val cleaners = logCleaner.cleaners
try {
// No CleanerThreads
assertMaxCleanTime(0)

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1
val cleaners = logCleaner.cleaners

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2
val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L
cleaners += cleaner1

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3
val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L
cleaners += cleaner2

assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L
cleaners += cleaner3

cleaners.clear()
// expect the gauge value to reflect the maximum cleanTime
assertMaxCleanTime(3)

cleaner1.lastStats.bufferUtilization = 5d
cleaners += cleaner1
cleaner2.lastStats.bufferUtilization = 6d
cleaners += cleaner2
cleaner3.lastStats.bufferUtilization = 7d
cleaners += cleaner3
// Update cleanTime and verify the gauge value updates
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L
assertMaxCleanTime(4)

assertEquals(7, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
// All CleanerThreads have the same cleanTime
cleaners.foreach(cleaner => cleaner.lastStats.endTime = cleaner.lastStats.startTime + 1_500L)
assertMaxCleanTime(1)
} finally {
logCleaner.shutdown()
}
}

@Test
def testMaxCompactionDelayMetrics(): Unit = {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)

def assertMaxCompactionDelay(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName,
() => (logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)
assertEquals(expected, gauge.value())
}

try {
// No CleanerThreads
assertMaxCompactionDelay(0)

val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
cleaners += cleaner3

// expect the gauge value to reflect the maximum CompactionDelay
assertMaxCompactionDelay(3)

// Update CompactionDelay and verify the gauge value updates
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
assertMaxCompactionDelay(4)

// All CleanerThreads have the same CompactionDelay
cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
assertMaxCompactionDelay(1)
} finally {
logCleaner.shutdown()
}
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
Expand Down
Loading