diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index b145adfa11f..bc3602bd69a 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -17,6 +17,8 @@ package kafka.api +import org.apache.kafka.common.config.ConfigDef.Validator +import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.record.RecordVersion /** @@ -267,3 +269,14 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion { val recordVersion = RecordVersion.V2 val id: Int = 18 } + +object ApiVersionValidator extends Validator { + + override def ensureValid(name: String, value: Any): Unit = { + try { + ApiVersion(value.toString) + } catch { + case e: IllegalArgumentException => throw new ConfigException(name, value.toString, e.getMessage) + } + } +} diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index bd4768eda25..d872e09ed79 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -20,7 +20,7 @@ package kafka.log import java.util.{Collections, Locale, Properties} import scala.collection.JavaConverters._ -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, ApiVersionValidator} import kafka.message.BrokerCompressionCodec import kafka.server.{KafkaConfig, ThrottledReplicaListValidator} import kafka.utils.Implicits._ @@ -256,7 +256,7 @@ object LogConfig { MEDIUM, CompressionTypeDoc, KafkaConfig.CompressionTypeProp) .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc, KafkaConfig.LogPreAllocateProp) - .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc, + .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, ApiVersionValidator, MEDIUM, MessageFormatVersionDoc, KafkaConfig.LogMessageFormatVersionProp) .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc, KafkaConfig.LogMessageTimestampTypeProp) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index e9598e365ab..f765f51a89a 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -26,7 +26,7 @@ import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} -import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName @@ -131,6 +131,9 @@ class AdminManager(val config: KafkaConfig, case e: ApiException => info(s"Error processing create topic request for topic $topic with arguments $arguments", e) CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e)) + case e: ConfigException => + info(s"Error processing create topic request for topic $topic with arguments $arguments", e) + CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause))) case e: Throwable => error(s"Error processing create topic request for topic $topic with arguments $arguments", e) CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e)) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 90225024bfc..700f32c534f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -20,7 +20,7 @@ package kafka.server import java.util import java.util.{Collections, Properties} -import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1} +import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1} import kafka.cluster.EndPoint import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} @@ -890,7 +890,7 @@ object KafkaConfig { .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc) .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc) - .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc) + .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, ApiVersionValidator, MEDIUM, LogMessageFormatVersionDoc) .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc) .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) @@ -918,7 +918,7 @@ object KafkaConfig { .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc) .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) - .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) + .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc) .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc) /** ********* Controlled shutdown configuration ***********/ diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 13f23e69b53..a469f8efe04 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -26,6 +26,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils.getDeleteTopicPath import org.apache.kafka.common.errors.TopicExistsException import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.config.ConfigException class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @@ -240,6 +241,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT } catch { case _: Exception => // topic creation should fail due to the invalid config } + + // try to create the topic with another invalid config + try { + val createOpts = new TopicCommandOptions( + Array("--partitions", "1", "--replication-factor", "1", "--topic", "test", + "--config", "message.format.version=boom")) + TopicCommand.createTopic(zkClient, createOpts) + fail("Expected exception on invalid topic-level config.") + } catch { + case _: ConfigException => // topic creation should fail due to the invalid config value + } } @Test diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index 13a2d23a91c..47c1765b4ad 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -71,6 +71,12 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { val invalidConfig = Map("not.a.property" -> "error").asJava validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(), Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false) + + val config = Map("message.format.version" -> "invalid-value").asJava + validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("error-config-value" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, config)).asJava, timeout).build(), + Map("error-config-value" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false) + val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0))) validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(), Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)