Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17668: Clean-up LogCleaner#maxOverCleanerThreads and LogCleanerManager#maintainUncleanablePartitions #17390

Merged
merged 14 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,11 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()

/**
* scala 2.12 does not support maxOption so we handle the empty manually.
* @param f to compute the result
* @return the max value (int value) or 0 if there is no cleaner
*/
private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
cleaners.map(f).maxOption.getOrElse(0.0d).toInt

/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
Expand Down
16 changes: 5 additions & 11 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,21 +532,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
def maintainUncleanablePartitions(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. LogCleanerManagerTest is a good place :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maintainUncleanablePartitions is invoked by CleanerThread#doWork, which is a daemon thread. It has been ㄎwidely tested through integration tests and unit tests related to LogCleaner. IMHO, we can leverage these existing tests and don't need to write new ones.

// Remove deleted partitions from uncleanablePartitions
inLock(lock) {
// Note: we don't use retain or filterInPlace method in this function because retain is deprecated in
// scala 2.13 while filterInPlace is not available in scala 2.12.

// Remove deleted partitions
uncleanablePartitions.values.foreach {
partitions =>
val partitionsToRemove = partitions.filterNot(logs.contains).toList
partitionsToRemove.foreach { partitions.remove }
uncleanablePartitions.values.foreach { partitions =>
partitions.filterInPlace(logs.contains)
}

// Remove entries with empty partition set.
val logDirsToRemove = uncleanablePartitions.filter {
case (_, partitions) => partitions.isEmpty
}.keys.toList
logDirsToRemove.foreach { uncleanablePartitions.remove }
uncleanablePartitions.filterInPlace {
case (_, partitions) => partitions.nonEmpty
}
}
}

Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,34 @@ class LogCleanerTest extends Logging {
}
}

@Test
def testMaxOverCleanerThreads(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

val cleaners = mutable.ArrayBuffer[logCleaner.CleanerThread]()

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75d
Copy link
Member

@ijuma ijuma Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: d is not needed, 0.75 is already a double. Same for a few more cases below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you 😸
I have removed the redundant d but still keep it for the integer.

cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85d
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65d
cleaners += cleaner3

assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can we also assert a non zero value?

Copy link
Contributor Author

@frankvicky frankvicky Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!
I have just added it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reviewing this test, I noticed that I introduced a bug in #8783 that I round down the double value when updating metrics max-buffer-utilization-percent

I open https://issues.apache.org/jira/browse/KAFKA-18597 to fix it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll backport this one to 4.0 to make it easier to backport the bug fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, given the fix, perhaps we don't need the case where we set the bufferUtilization to larger numbers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, given the fix, perhaps we don't need the case where we set the bufferUtilization to larger numbers.

you are right. I copy your comment to #18627

}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for (((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset
Expand Down
Loading