<li>Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to
to retain the previous behavior should set the broker config <code>delete.topic.enable</code> to <code>false</code>. Keep in mind that topic deletion removes data and the operation is not reversible (i.e. there is no "undelete" operation)</li>
<li>For topics that support timestamp search if no offset can be found for a partition, that partition is now included in the search result with a null offset value. Previously, the partition was not included in the map.
This change was made to make the search behavior consistent with the case of topics not supporting timestamp search.
on live log directories even if there are offline log directories. A log directory may become offline due to IOException
caused by hardware failure. Users need to monitor the per-broker metric <code>offlineLogDirectoryCount</code> to check
whether there is offline log directory. </li>
<li>Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response
if the version of client's FetchRequest or ProducerRequest does not support KafkaStorageException. </li>
<li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li>
<h5><aid="upgrade_100_streams"href="#upgrade_100_streams">Upgrading a 1.0.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.11.0 to 1.0.0 does not require a broker upgrade.
A Kafka Streams 1.0.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
<p>Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <ahref="#upgrade_1100_notable">notable changes in 0.11.0.0</a> before upgrading.
<li> Once the entire cluster is upgraded, bump the protocol version by editing <code>inter.broker.protocol.version</code> and setting it to 0.11.0, but
do not change <code>log.message.format.version</code> yet. </li>
<li>Added user headers support through a new Headers interface providing user headers read and write access.</li>
<li>ProducerRecord and ConsumerRecord expose the new Headers API via <code>Headers headers()</code> method call.</li>
<li>ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.</li>
This config specifies the time, in milliseconds, that the <code>GroupCoordinator</code> will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of <code>group.initial.rebalance.delay.ms</code> as new members join the group, up to a maximum of <code>max.poll.interval.ms</code>.
<li><code>org.apache.kafka.common.Cluster#partitionsForTopic</code>, <code>partitionsForNode</code> and <code>availablePartitionsForTopic</code> methods
will return an empty list instead of <code>null</code> (which is considered a bad practice) in case the metadata for the required topic does not exist.
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers">KIP-82</a>: ProduceRequest v3 introduces an array of <code>header</code> in the message protocol, containing <code>key</code> field and <code>value</code> field.</li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers">KIP-82</a>: FetchResponse v5 introduces an array of <code>header</code> in the message protocol, containing <code>key</code> field and <code>value</code> field.</li>
<h5><aid="upgrade_11_message_format"href="#upgrade_11_message_format">Notes on the new message format in 0.11.0</a></h5>
<p>The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer
(see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">KIP-98</a>)
and improved replication fault tolerance
(see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation">KIP-101</a>).
Although the new format contains more information to make these improvements possible, we have made the batch format much
more efficient. As long as the number of messages per batch is more than 2, you can expect lower overall overhead. For smaller
batches, however, there may be a small performance impact. See <ahref="bit.ly/kafka-eos-perf">here</a> for the results of our
initial performance analysis of the new message format. You can also find more detail on the message format in the
<li> Update server.properties file on all brokers and add the following properties:
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0 or 0.10.1).</li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <ahref="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2. </li>
<li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
<li> Restart the brokers one by one for the new protocol version to take effect. </li>
<li> If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.2 on each broker and restart them one by one. </li>
</ol>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<li> Upgrading your Streams application from 0.10.1 to 0.10.2 does not require a broker upgrade.
A Kafka Streams 0.10.2 application can connect to 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
<li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
<li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code> default value was changed from 300000 to <code>Integer.MAX_VALUE</code>.
<li>The Zookeeper dependency was removed from the Streams API. The Streams API now uses the Kafka protocol to manage internal topics instead of
modifying Zookeeper directly. This eliminates the need for privileges to access Zookeeper directly and "StreamsConfig.ZOOKEEPER_CONFIG"
should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.</li>
<li>Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to <ahref="/{{version}}/documentation/#streamsconfigs">3.5 Kafka Streams Configs</a>.</li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update">KIP-88</a>: OffsetFetchRequest v2 supports retrieval of offsets for all topics if the <code>topics</code> array is set to <code>null</code>. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update">KIP-88</a>: OffsetFetchResponse v2 introduces a top-level <code>error_code</code> field. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic">KIP-103</a>: UpdateMetadataRequest v3 introduces a <code>listener_name</code> field to the elements of the <code>end_points</code> array. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a>: CreateTopicsRequest v1 introduces a <code>validate_only</code> field. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a>: CreateTopicsResponse v1 introduces an <code>error_message</code> field to the elements of the <code>topic_errors</code> array. </li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <ahref="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0. </li>
<li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<h5><aid="upgrade_10_1_breaking"href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5>
<ul>
<li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li>
<li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp in the messages. More specifically. if the timestamp of the first message in the segment is T, the log will be rolled out when a new message has a timestamp greater than or equal to T + log.roll.ms </li>
<li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li>
<li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li>
<li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
<li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <ahref="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <ahref="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> The new Java consumer is no longer in beta and we recommend it for all new development. The old Scala consumers are still supported, but they will be deprecated in the next release
and will be removed in a future major release. </li>
<li> The <code>--new-consumer</code>/<code>--new.consumer</code> switch is no longer required to use tools like MirrorMaker and the Console Consumer with the new consumer; one simply
needs to pass a Kafka broker to connect to instead of the ZooKeeper ensemble. In addition, usage of the Console Consumer with the old consumer has been deprecated and it will be
<li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li>
<li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
<li> Fetch responses have a size limit by default (50 MB for consumers and 10 MB for replication). The existing per partition limits also apply (1 MB for consumers
and replication). Note that neither of these limits is an absolute maximum as explained in the next point. </li>
<li> Consumers and replicas can make progress if a message larger than the response/partition size limit is found. More concretely, if the first message in the
first non-empty partition of the fetch is larger than either or both limits, the message will still be returned. </li>
<li> Overloaded constructors were added to <code>kafka.api.FetchRequest</code> and <code>kafka.javaapi.FetchRequest</code> to allow the caller to specify the
order of the partitions (since order is significant in v3). The previously existing constructors were deprecated and the partitions are shuffled before
the request is sent to avoid starvation issues. </li>
0.10.0.0 has <ahref="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and possible <ahref="#upgrade_10_performance_impact"> performance impact following the upgrade</a>. By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade.
<br>
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
<li> Update server.properties file on all brokers and add the following properties:
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).</li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <ahref="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.0.0. NOTE: You shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 </li>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal.
To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old consumers. Once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression.
The conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore, it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not.
<li> Starting from Kafka 0.10.0.0, the message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0. </li>
<li> Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages. </li>
<li> ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
<li> FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
<li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code></li>
<li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code></li>
<li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>
<li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0. </li>
<li> The new consumer has standardized its APIs to accept <code>java.util.Collection</code> as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library. </li>
<li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <ahref="/{{version}}/documentation/streams">Streams documentation</a>.</li>
<li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
<li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li>
<li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li>
<li> The new consumer API has been marked stable. </li>
0.9.0.0 has <ahref="#upgrade_9_breaking">potential breaking changes</a> (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<li> Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly. </li>
<li> Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync. </li>
<li> Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync. </li>
<li> Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics). </li>
<li> MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration. </li>
<li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will still function as usual, only custom code directly importing these classes will be affected. </li>
<li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li>
<li> The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure. </li>
<li> The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual collision. </li>
<li> The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the Java producer instead of the old Scala producer be default, and users have to specify 'old-producer' to use the old producer. </li>
<li> Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics. </li>
<li> Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. </li>
<li> The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality. </li>
<li> The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class). </li>
<li> The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured</li>
Release 0.7 is incompatible with newer releases. Major changes were made to the API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8">special tool</a> for migration. This migration can be done without downtime.