diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 89ba641a759..de6559cdad2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -452,9 +452,10 @@ object KafkaConfig { "The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " + "need to be different from the port to which the broker binds. If this is not set, " + "it will publish the same port that the broker binds to." - val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." + + val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property." + " In IaaS environments, this may need to be different from the interface to which the broker binds." + - " If this is not set, the value for `listeners` will be used." + " If this is not set, the value for `listeners` will be used." + + " Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address." val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " + "the same security protocol to be usable in more than one port or IP. For example, we can separate internal and " + "external traffic even if SSL is required for both. Concretely, we could define listeners with names INTERNAL " + @@ -1190,6 +1191,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + s"are ${listenerNames.map(_.value).mkString(",")}" ) + require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"), + s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+ + s"Use a routable IP address.") require(interBrokerProtocolVersion >= logMessageFormatVersion, s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index dee6e8700d7..08a45b3eecb 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -718,6 +718,14 @@ class KafkaConfigTest { assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) } + @Test + def testNonroutableAdvertisedListeners() { + val props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092") + assertFalse(isValidKafkaConfig(props)) + } + private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { values.foreach((value) => { val props = validRequiredProps