Browse Source

KAFKA-4340; Follow-up fixing system test failures and handling non default log.retention.ms

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2544 from becketqin/KAFKA-4340_follow_up
pull/2522/merge
Jiangjie Qin 8 years ago committed by Ismael Juma
parent
commit
1f2ee5f0a9
  1. 6
      core/src/main/scala/kafka/log/LogValidator.scala
  2. 13
      core/src/main/scala/kafka/server/ConfigHandler.scala
  3. 15
      core/src/main/scala/kafka/server/KafkaConfig.scala
  4. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  5. 28
      core/src/test/scala/unit/kafka/log/LogTest.scala
  6. 2
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  7. 39
      docs/upgrade.html

6
core/src/main/scala/kafka/log/LogValidator.scala

@ -235,9 +235,11 @@ private[kafka] object LogValidator { @@ -235,9 +235,11 @@ private[kafka] object LogValidator {
now: Long,
timestampType: TimestampType,
timestampDiffMaxMs: Long) {
if (timestampType == TimestampType.CREATE_TIME && math.abs(record.timestamp - now) > timestampDiffMaxMs)
if (timestampType == TimestampType.CREATE_TIME
&& record.timestamp != Record.NO_TIMESTAMP
&& math.abs(record.timestamp - now) > timestampDiffMaxMs)
throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " +
s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
if (record.timestampType == TimestampType.LOG_APPEND_TIME)
throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime.")

13
core/src/main/scala/kafka/server/ConfigHandler.scala

@ -65,7 +65,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC @@ -65,7 +65,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
&& logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}. " +
s"This may result in potential frequent log rolling.")
s"This may result in frequent log rolling.")
logs.foreach(_.config = logConfig)
}
@ -98,16 +98,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC @@ -98,16 +98,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
}
def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = {
val excludeConfigs: mutable.Set[String] = new mutable.HashSet[String]
// Verify message format version
Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).foreach { versionString =>
Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { versionString =>
if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) {
warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic` because `$versionString` " +
s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
excludeConfigs += LogConfig.MessageFormatVersionProp
}
}
excludeConfigs.toSet
Some(LogConfig.MessageFormatVersionProp)
} else
None
}.toSet
}
}

15
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.server
import java.util
import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
@ -479,7 +480,8 @@ object KafkaConfig { @@ -479,7 +480,8 @@ object KafkaConfig {
val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
"a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " +
"if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." +
"The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling."
"The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling. For " +
"this reason, the default is the value of log.retention.ms."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +
@ -690,7 +692,7 @@ object KafkaConfig { @@ -690,7 +692,7 @@ object KafkaConfig {
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
.define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, null, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
/** ********* Replication configuration ***********/
@ -888,7 +890,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra @@ -888,7 +890,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
val logMessageTimestampDifferenceMaxMs = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
val logMessageTimestampDifferenceMaxMs = getMessageTimestampDifferenceMaxMs
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@ -998,6 +1000,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra @@ -998,6 +1000,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
millis
}
private def getMessageTimestampDifferenceMaxMs: Long = {
Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match {
case Some(value) => value
case None => getLogRetentionTimeMillis
}
}
private def getMap(propName: String, propValue: String): Map[String, String] = {
try {
CoreUtils.parseCsvMap(propValue)

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -76,7 +76,7 @@ object KafkaServer { @@ -76,7 +76,7 @@ object KafkaServer {
logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long)
logProps
}

28
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -354,7 +354,7 @@ class LogTest extends JUnitSuite { @@ -354,7 +354,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
timestamp = time.milliseconds))
messageSets.foreach(log.append(_))
log.flush()
@ -389,11 +389,11 @@ class LogTest extends JUnitSuite { @@ -389,11 +389,11 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(MemoryRecords.withRecords(CompressionType.GZIP,
Record.create(time.milliseconds, null, "hello".getBytes),
log.append(MemoryRecords.withRecords(CompressionType.GZIP,
Record.create(time.milliseconds, null, "hello".getBytes),
Record.create(time.milliseconds, null, "there".getBytes)))
log.append(MemoryRecords.withRecords(CompressionType.GZIP,
Record.create(time.milliseconds, null, "alpha".getBytes),
log.append(MemoryRecords.withRecords(CompressionType.GZIP,
Record.create(time.milliseconds, null, "alpha".getBytes),
Record.create(time.milliseconds, null, "beta".getBytes)))
def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
@ -445,7 +445,7 @@ class LogTest extends JUnitSuite { @@ -445,7 +445,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testMessageSetSizeCheck() {
val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),
val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),
Record.create(time.milliseconds, null, "bethe".getBytes))
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
@ -513,10 +513,10 @@ class LogTest extends JUnitSuite { @@ -513,10 +513,10 @@ class LogTest extends JUnitSuite {
*/
@Test
def testMessageSizeCheck() {
val first = MemoryRecords.withRecords(CompressionType.NONE,
Record.create(time.milliseconds, null, "You".getBytes),
val first = MemoryRecords.withRecords(CompressionType.NONE,
Record.create(time.milliseconds, null, "You".getBytes),
Record.create(time.milliseconds, null, "bethe".getBytes))
val second = MemoryRecords.withRecords(CompressionType.NONE,
val second = MemoryRecords.withRecords(CompressionType.NONE,
Record.create(time.milliseconds, null, "change (I need more bytes)".getBytes))
// append messages to log
@ -989,6 +989,16 @@ class LogTest extends JUnitSuite { @@ -989,6 +989,16 @@ class LogTest extends JUnitSuite {
log.append(invalidMessage, assignOffsets = false)
}
@Test
def testAppendWithNoTimestamp(): Unit = {
val log = new Log(logDir,
LogConfig(),
recoveryPoint = 0L,
time.scheduler,
time)
log.append(MemoryRecords.withRecords(Record.create(Record.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
}
@Test
def testCorruptLog() {
// append some messages to create some segments

2
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -586,6 +586,7 @@ class KafkaConfigTest { @@ -586,6 +586,7 @@ class KafkaConfigTest {
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
@ -702,6 +703,7 @@ class KafkaConfigTest { @@ -702,6 +703,7 @@ class KafkaConfigTest {
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
assertEquals(config.logRetentionTimeMillis, config.logMessageTimestampDifferenceMaxMs)
assertEquals(123L, config.logFlushIntervalMs)
assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)

39
docs/upgrade.html

@ -15,6 +15,44 @@ @@ -15,6 +15,44 @@
limitations under the License.
-->
<h4><a id="upgrade_10_3_0" href="#upgrade_10_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.10.3.0</a></h4>
<p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please review the <a href="#upgrade_1030_notable">notable changes in 0.10.3.0</a> before upgrading.
</p>
<p>Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.3
clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the
Kafka cluster before upgrading your clients. Version 0.10.3 brokers support 0.8.x and newer clients.
</p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li> Update server.properties file on all brokers and add the following properties:
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1 or 0.10.2).</li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.3. </li>
<li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.3 (this is a no-op as the message format is the same for 0.10.0, 0.10.1, 0.10.2 and 0.10.3).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
<li> Restart the brokers one by one for the new protocol version to take effect. </li>
<li> If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.3 on each broker and restart them one by one. </li>
</ol>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<h5><a id="upgrade_1030_notable" href="#upgrade_1030_notable">Notable changes in 0.10.3.0</a></h5>
<ul>
<li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.</li>
<li>By default <code>message.timestamp.difference.max.ms</code> is the same as <code>retention.ms</code> instead of <code>Long.MAX_VALUE</code>.</li>
</ul>
<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
<p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please review the <a href="#upgrade_1020_notable">notable changes in 0.10.2.0</a> before upgrading.
@ -75,7 +113,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8. @@ -75,7 +113,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.</li>
<li>Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to <a href="/{{version}}/documentation/#streamsconfigs">3.5 Kafka Streams Configs</a>.</li>
<li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.</li>
</ul>
<h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New Protocol Versions</a></h5>

Loading…
Cancel
Save