Upgrading from any older version to 2.0.0 is possible: (1) you need to make sure to update you code accordingly, because there are some minor non-compatible API changes since older
releases (the code changes are expected to be minimal, please see below for the details),
(2) upgrading to 2.0.0 in the online mode requires two rolling bounces.
For (2), in the first rolling bounce phase users need to set config <code>upgrade.from="older version"</code> (possible values are <code>"0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", and "1.1"</code>)
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade to new version 2.0.0</li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 2.0.0 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code></li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
<p> As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.10.0.x to 2.0.0 in offline mode require the following steps: </p>
<ul>
<li> stop all old (e.g., 0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new (2.0.0) application instances </li>
In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <ahref="#streams_api_changes_200">Streams API changes</a> below).
If you have customized window store implementations that extends the <code>ReadOnlyWindowStore</code> interface you need to make code changes.
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying a single window's key-value pair.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores">KIP-261</a>.
We have added public <code>WindowedSerdes</code> to allow users to read from / write to a topic storing windowed table changelogs directly.
In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
to let users specify inner serdes if the default serde classes are windowed serdes.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
We have deprecated constructors of <code>KafkaStreams</code> that take a <code>StreamsConfig</code> as parameter.
Please use the other corresponding constructors that accept <code>java.util.Properties</code> instead.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
Forwarding based on child index is not supported in the new API any longer.
<ahref="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now.
We have modified the <code>ProcessorStateManger#register(...)</code> signature and removed the deprecated <code>loggingEnabled</code> boolean parameter as it is specified in the <code>StoreBuilder</code>.
Users who used this function to register their state stores into the processor topology need to simply update their code and remove this parameter from the caller.
</p>
<p>
Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when
interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection types, a way
to implicitly provide SerDes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the
<ahref="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka Streams DSL for Scala documentation</a> and
<li><code>KafkaStreams#toString</code> no longer returns the topology and runtime metadata; to get topology metadata users can call <code>Topology#describe()</code> and to get thread runtime metadata users can call <code>KafkaStreams#localThreadsMetadata</code> (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>TopologyBuilder</code> and <code>KStreamBuilder</code> are removed and replaced by <code>Topology</code> and <code>StreamsBuidler</code> respectively (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>StateStoreSupplier</code> are removed and replaced with <code>StoreBuilder</code> (they are deprecated since 1.0.0);
and the corresponding <code>Stores#create</code> and <code>KStream, KTable, KGroupedStream</code> overloaded functions that use it have also been removed.
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>KStream, KTable, KGroupedStream</code> overloaded functions that requires serde and other specifications explicitly are removed and replaced with simpler overloaded functions that use <code>Consumed, Produced, Serialized, Materialized, Joined</code> (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>Processor#punctuate</code>, <code>ValueTransformer#punctuate</code>, <code>ValueTransformer#punctuate</code> and <code>RecordContext#schedule(long)</code> are removed and replaced by <code>RecordContext#schedule(long, PunctuationType, Punctuator)</code> (they are deprecated in 1.0.0). </li>
<li>The second <code>boolean</code> typed parameter "loggingEnabled" in <code>ProcessorContext#register</code> has been removed; users can now use <code>StoreBuilder#withLoggingEnabled, withLoggingDisabled</code> to specify the behavior when they create the state store. </li>
<li><code>KTable#writeAs, print, foreach, to, through</code> are removed, users can call <code>KTable#tostream()#writeAs</code> instead for the same purpose (they are deprecated since 0.11.0.0).
For detailed list of removed APIs please read <ahref="#streams_api_changes_0110">here</a></li>
<li><code>StreamsConfig#KEY_SERDE_CLASS_CONFIG, VALUE_SERDE_CLASS_CONFIG, TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> are removed and replaced with <code>StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, DEFAULT_VALUE_SERDE_CLASS_CONFIG, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> respectively (they are deprecated since 0.11.0.0). </li>
<li><code>StreamsConfig#ZOOKEEPER_CONNECT_CONFIG</code> are removed as we do not need ZooKeeper dependency in Streams any more (it is deprecated since 0.10.2.0). </li>
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+all%28%29+and+range%28%29+API+to+ReadOnlyWindowStore">KIP-205</a>.
There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier">KIP-220</a>
enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients.
You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code>
to distinguish them from configurations of other clients that share the same config names.
<li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li>
<li> added overload for the constructor that allows overriding the <code>Time</code> object used for tracking system wall-clock time; this is useful for unit testing your application code. </li>
</ul>
<p> New methods in <code>KafkaClientSupplier</code>: </p>
<ul>
<li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li>
<p>New error handling for exceptions during production:</p>
<ul>
<li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li>
<li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li>
<li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li>
<li> added options to specify input topics offsets to reset according to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a></li>
Furthermore, some overloads were simplified compared to the original classes.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API">KIP-120</a>
and <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
<li>With the introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a>
a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed.
The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
<li>retrieve the current runtime information about the local threads via <code>localThreadsMetadata()</code></li>
<li>observe the restoration of all state stores via <code>setGlobalStateRestoreListener()</code>, in which users can provide their customized implementation of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface</li>
They have been deprecated in favor of using the new classes/methods <code>localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and
With the introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time.
As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>.
The <code>PunctuationType</code> determines what notion of time is used for the punctuation scheduling: either <ahref="/{{version}}/documentation/streams/core-concepts#streams_time">stream time</a> or wall-clock time (by default, <b>stream time</b> is configured to represent event time via <code>TimestampExtractor</code>).
In addition, the <code>punctuate</code> function inside <code>Processor</code> is also deprecated.
</p>
<p>
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed within 20 seconds,
<code>punctuate</code> would be called 2 times (one time every 10 seconds);
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed:
The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy.
As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0.
Detailed metrics sensor can be found in the <ahref="#kafka_streams_monitoring">Streams Monitoring</a> section.
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers">KIP-161</a>
enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs">KIP-173</a>
enables you to provide topic configuration parameters for any topics created by Kafka Streams.
This includes repartition and changelog topics.
You can provide the configs via the <code>StreamsConfig</code> by adding the configs with the prefix as defined by <code>StreamsConfig#topicPrefix(String)</code>.
Any properties in the <code>StreamsConfig</code> with the prefix will be applied when creating internal topics.
Any configs that aren't topic configs will be ignored.
If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config.
<li> added overloads for <code>addSource()</code> that allow to define a <code>TimestampExtractor</code> per source node </li>
<li> added overloads for <code>addGlobalStore()</code> that allow to define a <code>TimestampExtractor</code> per source node associated with the global store </li>
If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
<p><code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
<h3><aid="streams_api_changes_01021"href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
<p>
Parameter updates in <code>StreamsConfig</code>:
</p>
<ul>
<li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
</ul>
<h3><aid="streams_api_changes_0102"href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol
<li> added overloads for <code>stream()</code> and <code>table()</code> that allow to define a <code>auto.offset.reset</code> policy per input stream/table </li>
<li> added method <code>globalKTable()</code> to create a <code>GlobalKTable</code></li>
<li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x
(cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
</ul>
<p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
<ul>
<li> like all other KTable operations, <code>KTable-KTable</code> joins do not throw an exception on <code>null</code> key records anymore, but drop those records silently </li>
</ul>
<p> New window type <em>Session Windows</em>: </p>
<ul>
<li> added class <code>SessionWindows</code> to specify session windows </li>
<li> new default timestamp extractor class <code>FailOnInvalidTimestamp</code>
(it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li>
<li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code></li>
</ul>
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
<h3><aid="streams_api_changes_0101"href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
<p> Stream grouping and aggregation split into two methods: </p>
<ul>
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
<li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li>
<li> Example: stream.countByKey() changes to stream.groupByKey().count() </li>
</ul>
<p> Auto Repartitioning: </p>
<ul>
<li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li>
<li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li>
</ul>
<p> TopologyBuilder: </p>
<ul>
<li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
</ul>
<p> DSL: new parameter to specify state store names: </p>
<ul>
<li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li>
<li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li>
<li> KTable#through(String topic) changes to #through(String topic, String storeName) </li>
<li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
</ul>
<p> Windowing: </p>
<ul>
<li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
<li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>