Browse Source

KAFKA-7409; Validate message format version before creating topics or altering configs (#5651)

Values for `message.format.version` and `log.message.format.version` should be verified before topic creation or config change.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
pull/5712/head
huxi 6 years ago committed by Jason Gustafson
parent
commit
70d90c3718
  1. 13
      core/src/main/scala/kafka/api/ApiVersion.scala
  2. 4
      core/src/main/scala/kafka/log/LogConfig.scala
  3. 5
      core/src/main/scala/kafka/server/AdminManager.scala
  4. 6
      core/src/main/scala/kafka/server/KafkaConfig.scala
  5. 12
      core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
  6. 6
      core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala

13
core/src/main/scala/kafka/api/ApiVersion.scala

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package kafka.api
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.record.RecordVersion
/**
@ -267,3 +269,14 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion { @@ -267,3 +269,14 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion {
val recordVersion = RecordVersion.V2
val id: Int = 18
}
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
try {
ApiVersion(value.toString)
} catch {
case e: IllegalArgumentException => throw new ConfigException(name, value.toString, e.getMessage)
}
}
}

4
core/src/main/scala/kafka/log/LogConfig.scala

@ -20,7 +20,7 @@ package kafka.log @@ -20,7 +20,7 @@ package kafka.log
import java.util.{Collections, Locale, Properties}
import scala.collection.JavaConverters._
import kafka.api.ApiVersion
import kafka.api.{ApiVersion, ApiVersionValidator}
import kafka.message.BrokerCompressionCodec
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
import kafka.utils.Implicits._
@ -256,7 +256,7 @@ object LogConfig { @@ -256,7 +256,7 @@ object LogConfig {
MEDIUM, CompressionTypeDoc, KafkaConfig.CompressionTypeProp)
.define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc,
KafkaConfig.LogPreAllocateProp)
.define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc,
.define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, ApiVersionValidator, MEDIUM, MessageFormatVersionDoc,
KafkaConfig.LogMessageFormatVersionProp)
.define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc,
KafkaConfig.LogMessageTimestampTypeProp)

5
core/src/main/scala/kafka/server/AdminManager.scala

@ -26,7 +26,7 @@ import kafka.utils._ @@ -26,7 +26,7 @@ import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@ -131,6 +131,9 @@ class AdminManager(val config: KafkaConfig, @@ -131,6 +131,9 @@ class AdminManager(val config: KafkaConfig,
case e: ApiException =>
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
case e: ConfigException =>
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
case e: Throwable =>
error(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))

6
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -20,7 +20,7 @@ package kafka.server @@ -20,7 +20,7 @@ package kafka.server
import java.util
import java.util.{Collections, Properties}
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1}
import kafka.cluster.EndPoint
import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
@ -890,7 +890,7 @@ object KafkaConfig { @@ -890,7 +890,7 @@ object KafkaConfig {
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
.define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, ApiVersionValidator, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
@ -918,7 +918,7 @@ object KafkaConfig { @@ -918,7 +918,7 @@ object KafkaConfig {
.define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
.define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
/** ********* Controlled shutdown configuration ***********/

12
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

@ -26,6 +26,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions @@ -26,6 +26,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils.getDeleteTopicPath
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.config.ConfigException
class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@ -240,6 +241,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT @@ -240,6 +241,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
} catch {
case _: Exception => // topic creation should fail due to the invalid config
}
// try to create the topic with another invalid config
try {
val createOpts = new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", "test",
"--config", "message.format.version=boom"))
TopicCommand.createTopic(zkClient, createOpts)
fail("Expected exception on invalid topic-level config.")
} catch {
case _: ConfigException => // topic creation should fail due to the invalid config value
}
}
@Test

6
core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala

@ -71,6 +71,12 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { @@ -71,6 +71,12 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
val invalidConfig = Map("not.a.property" -> "error").asJava
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
val config = Map("message.format.version" -> "invalid-value").asJava
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("error-config-value" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, config)).asJava, timeout).build(),
Map("error-config-value" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)

Loading…
Cancel
Save