Skip to content

Commit 57c2216

Browse files
authored
KAFKA-10478: Allow duplicated ports in advertised.listeners (#9281)
Remove the requirement for unique port numbers for the advertised.listener parameters. This restriction makes for the listeners parameter but there's not reason to apply the same logic for advertised.listeners. Being able to do this opens possibilities for some practical applications when using Kerberos authentication. For example, when configuring Kafka using Kerberos authentication and a Load Balancer we need to have two SASL_SSL listeners: (A) one running with the kafka/hostname principal and (B) another using kafka/lb_name, which is necessary for proper authentication when using the LB FQDN. After bootstrap, though, the client receives the brokers' addresses with the actual host FQDNs advertised by the brokers. To connect to the brokerd using the hostnames the client must connect to the listener A to be able to authenticate successfully with Kerberos. Author: Andre Araujo <[email protected]> Reviewers: Mickael Maison <[email protected]>, Viktor Somogyi-Vass <[email protected]>, Tom Bentley <[email protected]>
1 parent 3bc2df7 commit 57c2216

File tree

3 files changed

+47
-24
lines changed

3 files changed

+47
-24
lines changed

core/src/main/scala/kafka/server/KafkaConfig.scala

+16-12
Original file line numberDiff line numberDiff line change
@@ -642,12 +642,13 @@ object KafkaConfig {
642642
"Use <code>listeners</code> instead. \n" +
643643
"hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces"
644644
val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." +
645-
s" If the listener name is not a security protocol, $ListenerSecurityProtocolMapProp must also be set.\n" +
646-
" Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
647-
" Leave hostname empty to bind to default interface.\n" +
648-
" Examples of legal listener lists:\n" +
649-
" PLAINTEXT://myhost:9092,SSL://:9091\n" +
650-
" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n"
645+
s" If the listener name is not a security protocol, <code>$ListenerSecurityProtocolMapProp<code> must also be set.\n" +
646+
" Listener names and port numbers must be unique.\n" +
647+
" Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
648+
" Leave hostname empty to bind to default interface.\n" +
649+
" Examples of legal listener lists:\n" +
650+
" PLAINTEXT://myhost:9092,SSL://:9091\n" +
651+
" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n"
651652
val AdvertisedHostNameDoc = "DEPRECATED: only used when <code>advertised.listeners</code> or <code>listeners</code> are not set. " +
652653
"Use <code>advertised.listeners</code> instead. \n" +
653654
"Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
@@ -659,10 +660,13 @@ object KafkaConfig {
659660
"The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
660661
"need to be different from the port to which the broker binds. If this is not set, " +
661662
"it will publish the same port that the broker binds to."
662-
val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the <code>listeners</code> config property." +
663-
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
664-
" If this is not set, the value for <code>listeners</code> will be used." +
665-
" Unlike <code>listeners</code> it is not valid to advertise the 0.0.0.0 meta-address."
663+
val AdvertisedListenersDoc = s"Listeners to publish to ZooKeeper for clients to use, if different than the <code>$ListenersProp</code> config property." +
664+
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
665+
s" If this is not set, the value for <code>$ListenersProp</code> will be used." +
666+
s" Unlike <code>$ListenersProp</code>, it is not valid to advertise the 0.0.0.0 meta-address.\n" +
667+
s" Also unlike <code>$ListenersProp</code>, there can be duplicated ports in this property," +
668+
" so that one listener can be configured to advertise another listener's address." +
669+
" This can be useful in some cases where external load balancers are used."
666670
val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " +
667671
"the same security protocol to be usable in more than one port or IP. For example, internal and " +
668672
"external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners " +
@@ -1687,9 +1691,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
16871691
def advertisedListeners: Seq[EndPoint] = {
16881692
val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
16891693
if (advertisedListenersProp != null)
1690-
CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap)
1694+
CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap, requireDistinctPorts=false)
16911695
else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null)
1692-
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap)
1696+
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap, requireDistinctPorts=false)
16931697
else
16941698
listeners
16951699
}

core/src/main/scala/kafka/utils/CoreUtils.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,20 @@ object CoreUtils {
250250
}
251251

252252
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
253+
listenerListToEndPoints(listeners, securityProtocolMap, true)
254+
}
255+
256+
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
253257
def validate(endPoints: Seq[EndPoint]): Unit = {
254258
// filter port 0 for unit tests
255259
val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
256-
val distinctPorts = portsExcludingZero.distinct
257260
val distinctListenerNames = endPoints.map(_.listenerName).distinct
258261

259-
require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
260262
require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
263+
if (requireDistinctPorts) {
264+
val distinctPorts = portsExcludingZero.distinct
265+
require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
266+
}
261267
}
262268

263269
val endPoints = try {

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

+23-10
Original file line numberDiff line numberDiff line change
@@ -218,16 +218,26 @@ class KafkaConfigTest {
218218
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
219219

220220
// listeners with duplicate port
221-
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,TRACE://localhost:9091")
222-
assertFalse(isValidKafkaConfig(props))
221+
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
222+
var caught = intercept[IllegalArgumentException] { KafkaConfig.fromProps(props) }
223+
assertTrue(caught.getMessage.contains("Each listener must have a different port"))
223224

224-
// listeners with duplicate protocol
225+
// listeners with duplicate name
225226
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
226-
assertFalse(isValidKafkaConfig(props))
227+
caught = intercept[IllegalArgumentException] { KafkaConfig.fromProps(props) }
228+
assertTrue(caught.getMessage.contains("Each listener must have a different name"))
229+
230+
// advertised listeners can have duplicate ports
231+
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
232+
props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
233+
props.put(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
234+
props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
235+
assertTrue(isValidKafkaConfig(props))
227236

228-
// advertised listeners with duplicate port
229-
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9091,TRACE://localhost:9091")
230-
assertFalse(isValidKafkaConfig(props))
237+
// but not duplicate names
238+
props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
239+
caught = intercept[IllegalArgumentException] { KafkaConfig.fromProps(props) }
240+
assertTrue(caught.getMessage.contains("Each listener must have a different name"))
231241
}
232242

233243
@Test
@@ -544,9 +554,12 @@ class KafkaConfigTest {
544554
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
545555
props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
546556
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
547-
intercept[IllegalArgumentException] {
548-
KafkaConfig.fromProps(props)
549-
}
557+
var caught = intercept[IllegalArgumentException] { KafkaConfig.fromProps(props) }
558+
assertTrue(caught.getMessage.contains("No security protocol defined for listener TRACE"))
559+
560+
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
561+
caught = intercept[IllegalArgumentException] { KafkaConfig.fromProps(props) }
562+
assertTrue(caught.getMessage.contains("advertised.listeners listener names must be equal to or a subset of the ones defined in listeners"))
550563
}
551564

552565
@Test

0 commit comments

Comments
 (0)