From cafd9b945bf895a91e4680c0d533d2f004bdaa8b Mon Sep 17 00:00:00 2001 From: frankvicky Date: Fri, 17 Jan 2025 20:17:30 +0800 Subject: [PATCH] KAFKA-18559: Cleanup FinalizedFeatures JIRA: KAFKA-18559 Cleanup the zk logic and test in `FinalizedFeatures` --- .../server/metadata/KRaftMetadataCache.scala | 3 +-- .../TransactionCoordinatorConcurrencyTest.scala | 4 +--- .../transaction/TransactionStateManagerTest.scala | 8 ++------ .../scala/unit/kafka/network/ProcessorTest.scala | 6 +++--- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 6 ++---- .../metadata/publisher/FeaturesPublisher.java | 4 ++-- .../kafka/server/common/FinalizedFeatures.java | 15 ++++----------- .../server/common/FinalizedFeaturesTest.java | 13 +------------ 9 files changed, 17 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 51c05d68c3215..be13635c1ab97 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -540,8 +540,7 @@ class KRaftMetadataCache( } new FinalizedFeatures(image.features().metadataVersion(), finalizedFeatures, - image.highestOffsetAndEpoch().offset, - true) + image.highestOffsetAndEpoch().offset) } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 24000894fe9bb..621730bc65eb9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -85,9 +85,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren new FinalizedFeatures( MetadataVersion.latestTesting(), Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), - 0, - true - ) + 0) } when(metadataCache.metadataVersion()) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 41e6b1a954a5f..522461e5485e0 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -70,9 +70,7 @@ class TransactionStateManagerTest { new FinalizedFeatures( MetadataVersion.latestTesting(), Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), - 0, - true - ) + 0) } val metrics = new Metrics() @@ -1332,9 +1330,7 @@ class TransactionStateManagerTest { new FinalizedFeatures( MetadataVersion.latestTesting(), Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), - 0, - true - ) + 0) } val transactionManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time, metrics) diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala index 94e93d4d2a801..3a862678ca79b 100644 --- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala +++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala @@ -37,7 +37,7 @@ class ProcessorTest { val requestHeader = RequestTestUtils.serializeRequestHeader( new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0)) val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) val e = assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, "INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception") @@ -55,7 +55,7 @@ class ProcessorTest { .setCorrelationId(0); val requestHeader = RequestTestUtils.serializeRequestHeader(new RequestHeader(requestHeaderData, headerVersion)) val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) val e = assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, "LEADER_AND_ISR should throw InvalidRequestException exception") @@ -67,7 +67,7 @@ class ProcessorTest { val requestHeader = RequestTestUtils.serializeRequestHeader( new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0)) val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) val e = assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, "PRODUCE v0 should throw UnsupportedVersionException exception") diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5ebcfd65ccec2..5b2196018e557 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -84,7 +84,7 @@ class SocketServerTest { TestUtils.clearYammerMetrics() private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) var server: SocketServer = _ val sockets = new ArrayBuffer[Socket] diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index af26996c56eb2..901309b289ded 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -179,7 +179,7 @@ class KafkaApisTest extends Logging { enabledApis, BrokerFeatures.defaultSupportedFeatures(true), true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled) setupFeatures(featureVersions) @@ -220,9 +220,7 @@ class KafkaApisTest extends Logging { featureVersions.map { featureVersion => featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short] }.toMap.asJava, - 0, - true - ) + 0) } case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache") diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java index a03f08291b5d8..01572dd941143 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java @@ -57,8 +57,8 @@ public void onMetadataUpdate( if (delta.featuresDelta() != null) { FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersion(), newImage.features().finalizedVersions(), - newImage.provenance().lastContainedOffset(), - true); + newImage.provenance().lastContainedOffset() + ); if (!newFinalizedFeatures.equals(finalizedFeatures)) { log.info("Loaded new metadata {}.", newFinalizedFeatures); finalizedFeatures = newFinalizedFeatures; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java index de78a3a72a883..1eb394664094d 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java @@ -27,25 +27,18 @@ public final class FinalizedFeatures { private final long finalizedFeaturesEpoch; public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { - return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); + return new FinalizedFeatures(version, Collections.emptyMap(), -1); } public FinalizedFeatures( MetadataVersion metadataVersion, Map finalizedFeatures, - long finalizedFeaturesEpoch, - boolean kraftMode + long finalizedFeaturesEpoch ) { - this.metadataVersion = metadataVersion; + this.metadataVersion = Objects.requireNonNull(metadataVersion); this.finalizedFeatures = new HashMap<>(finalizedFeatures); this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; - // In KRaft mode, we always include the metadata version in the features map. - // In ZK mode, we never include it. - if (kraftMode) { - this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel()); - } else { - this.finalizedFeatures.remove(MetadataVersion.FEATURE_NAME); - } + this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel()); } public MetadataVersion metadataVersion() { diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java index ae6ca998df2da..31d57bedfe585 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java @@ -24,27 +24,16 @@ import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; class FinalizedFeaturesTest { @Test public void testKRaftModeFeatures() { FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION, - Collections.singletonMap("foo", (short) 2), 123, true); + Collections.singletonMap("foo", (short) 2), 123); assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), finalizedFeatures.finalizedFeatures().get(FEATURE_NAME)); assertEquals((short) 2, finalizedFeatures.finalizedFeatures().get("foo")); assertEquals(2, finalizedFeatures.finalizedFeatures().size()); } - - @Test - public void testZkModeFeatures() { - FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION, - Collections.singletonMap("foo", (short) 2), 123, false); - assertNull(finalizedFeatures.finalizedFeatures().get(FEATURE_NAME)); - assertEquals((short) 2, - finalizedFeatures.finalizedFeatures().get("foo")); - assertEquals(1, finalizedFeatures.finalizedFeatures().size()); - } }