Browse Source

KAFKA-4260; Disallow non-routable address in advertised.listeners

As described in the JIRA ticket, when `listeners=PLAINTEXT://0.0.0.0:9092`
(note the 0.0.0.0 "bind all interfaces" IP address) and
`advertised.listeners` is not specified it defaults to `listeners`,
but it makes no sense to advertise 0.0.0.0 as it's not a routable IP
address.

This patch checks for a 0.0.0.0 host in `advertised.listeners`
(whether via default or not) and fails with a meaningful error if it's
found.

This contribution is my original work and I license the work to the
project under the project's open source license.

Author: Tom Bentley <tbentley@redhat.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3382 from tombentley/advertised.listeners
pull/3582/merge
Tom Bentley 7 years ago committed by Ismael Juma
parent
commit
ef17d5b892
  1. 8
      core/src/main/scala/kafka/server/KafkaConfig.scala
  2. 8
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

8
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -452,9 +452,10 @@ object KafkaConfig { @@ -452,9 +452,10 @@ object KafkaConfig {
"The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
"need to be different from the port to which the broker binds. If this is not set, " +
"it will publish the same port that the broker binds to."
val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." +
val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property." +
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
" If this is not set, the value for `listeners` will be used."
" If this is not set, the value for `listeners` will be used." +
" Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address."
val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " +
"the same security protocol to be usable in more than one port or IP. For example, we can separate internal and " +
"external traffic even if SSL is required for both. Concretely, we could define listeners with names INTERNAL " +
@ -1190,6 +1191,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra @@ -1190,6 +1191,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
require(interBrokerProtocolVersion >= logMessageFormatVersion,
s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL

8
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -718,6 +718,14 @@ class KafkaConfigTest { @@ -718,6 +718,14 @@ class KafkaConfigTest {
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
}
@Test
def testNonroutableAdvertisedListeners() {
val props = new Properties()
props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
assertFalse(isValidKafkaConfig(props))
}
private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
values.foreach((value) => {
val props = validRequiredProps

Loading…
Cancel
Save