If you want to upgrade from 0.11.0.x to 1.0.0 you don't need to do any code changes as the public API is fully backward compatible.
However, some public APIs were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
See <ahref="#streams_api_changes_100">below</a> a complete list of 1.0.0 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
However, some configuration parameters were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
See <ahref="#streams_api_changes_0110">below</a> a complete list of 0.11.0 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
If you want to upgrade from 0.10.1.x to 0.10.2, see the <ahref="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
See <ahref="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
If you want to upgrade from 0.10.0.x to 0.10.1, see the <ahref="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 0.10.1</b></a>.
It highlights incompatible changes you need to consider to upgrade your code and application.
See <ahref="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use.
This change includes the five main classes <code>KafakStreams<code>, <code>KStreamBuilder</code>,
<code>KStream</code>, <code>KTable</code>, and <code>TopologyBuilder</code> (and some more others).
All changes are fully backward compatible as old API is only deprecated but not removed.
We recommend to move to the new API as soon as you can.
We will summarize all API changes in the next paragraphs.
</p>
<p>
The two main classes to specify a topology via the DSL (<code>KStreamBuilder</code>)
or the Processor API (<code>TopologyBuilder</code>) were deprecated and replaced by
<code>StreamsBuilder</code> and <code>Topology<code> (both new classes are located in
package <code>org.apache.kafka.streams</code>).
Note, that <code>StreamsBuilder</code> does not extend <code>Topology</code>, i.e.,
the class hierarchy is different now.
The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API.
However, some internal methods that were public in <code>KStreamBuilder</code>
and <code>TopologyBuilder</code> but not part of the actual API are not present
in the new classe any longer.
Furthermore, some overloads were simplified compared to the original classes.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API">KIP-120</a>
and <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
for full details.
</p>
<p>
Changing how a topology is specified also affects <code>KafkaStreams<code> constructors,
that now only accept a <code>Topology</code>.
Using the DSL builder class <code>StreamsBuilder</code> one can get the constructed
<code>Topology</code> via <code>StreamsBuilder#build()</code>.
Additionally, a new class <code>org.apache.kafka.streams.TopologyDescription</code>
(and some more dependent classes) were added.
Those can be used to get a detailed description of the specified topology
and can be obtained by calling <code>Topology#describe()</code>.
An example using this new API is shown in the <ahref="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time.
As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>.
The <code>PunctuationType</code> determines what notion of time is used for the punctuation scheduling: either <ahref="/{{version}}/documentation/streams/core-concepts#streams_time">stream time</a> or wall-clock time (by default, <b>stream time</b> is configured to represent event time via <code>TimestampExtractor</code>).
In addition, the <code>punctuate</code> function inside <code>Processor</code> is also deprecated.
</p>
<p>
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
On the other hand, When wall-clock time (i.e. <code>PunctuationType.SYSTEM_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.SYSTEM_TIME</code>, if these 60 records were processed within 20 seconds,
<code>punctuate</code> would be called 2 times (one time every 10 seconds);
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed:
The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy.
As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0.
Detailed metrics sensor can be found in the <ahref="#kafka_streams_monitoring">Streams Monitoring</a> section.
<h3><aid="streams_api_changes_0110"href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
<p> Updates in <code>StreamsConfig</code>: </p>
<ul>
<li> new configuration parameter <code>processing.guarantee</code> is added </li>
<li> configuration parameter <code>key.serde</code> was deprecated and replaced by <code>default.key.serde</code></li>
<li> configuration parameter <code>value.serde</code> was deprecated and replaced by <code>default.value.serde</code></li>
<li> configuration parameter <code>timestamp.extractor</code> was deprecated and replaced by <code>default.timestamp.extractor</code></li>
<li> method <code>#keySerde()</code> was deprecated and replaced by <code>#defaultKeySerde()</code></li>
<li> method <code>#valueSerde()</code> was deprecated and replaced by <code>#defaultValueSerde()</code></li>
<li> new method <code>#defaultTimestampExtractor()</code> was added </li>
</ul>
<p> New methods in <code>TopologyBuilder</code>: </p>
<ul>
<li> added overloads for <code>#addSource()</code> that allow to define a <code>TimestampExtractor</code> per source node </li>
<li> added overloads for <code>#addGlobalStore()</code> that allow to define a <code>TimestampExtractor</code> per source node associated with the global store </li>
</ul>
<p> New methods in <code>KStreamBuilder</code>: </p>
<ul>
<li> added overloads for <code>#stream()</code> that allow to define a <code>TimestampExtractor</code> per input stream </li>
<li> added overloads for <code>#table()</code> that allow to define a <code>TimestampExtractor</code> per input table </li>
<li> added overloads for <code>#globalKTable()</code> that allow to define a <code>TimestampExtractor</code> per global table </li>
</ul>
<p> Deprecated methods in <code>KTable</code>: </p>
<ul>
<li><code>void foreach(final ForeachAction<? super K, ? super V> action)</code></li>
<li><code>void writeAsText(final String filePath, final String streamName)</code></li>
<li><code>void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)</code></li>
<li><code>void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)</code></li>
</ul>
<p>
The above methods have been deprecated in favor of using the Interactive Queries API.
If you want to query the current content of the state store backing the KTable, use the following approach:
</p>
<ul>
<li> Make a call to <code>KafkaStreams.store(final String storeName, final QueryableStoreType<T> queryableStoreType)</code></li>
<li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li>
</ul>
<p>
If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print()</code>.
</p>
<p> Metrics using exactly-once semantics: </p>
<p>
If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
<p><code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
<h3><aid="streams_api_changes_01021"href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
<p>
Parameter updates in <code>StreamsConfig</code>:
</p>
<ul>
<li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
</ul>
<h3><aid="streams_api_changes_0102"href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<p>
New methods in <code>KafkaStreams</code>:
</p>
<ul>
<li> set a listener to react on application state change via <code>#setStateListener(StateListener listener)</code></li>
<li> retrieve the current application state via <code>#state()</code></li>
<li> retrieve the global metrics registry via <code>#metrics()</code></li>
<li> apply a timeout when closing an application via <code>#close(long timeout, TimeUnit timeUnit)</code></li>
<li> specify a custom indent when retrieving Kafka Streams information via <code>#toString(String indent)</code></li>
</ul>
<p>
Parameter updates in <code>StreamsConfig</code>:
</p>
<ul>
<li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with Zookeeper for topic management but uses the new broker admin protocol
<p> New methods in <code>TopologyBuilder</code>: </p>
<ul>
<li> added overloads for <code>#addSource()</code> that allow to define a <code>auto.offset.reset</code> policy per source node </li>
<li> added methods <code>#addGlobalStore()</code> to add global <code>StateStore</code>s </li>
</ul>
<p> New methods in <code>KStreamBuilder</code>: </p>
<ul>
<li> added overloads for <code>#stream()</code> and <code>#table()</code> that allow to define a <code>auto.offset.reset</code> policy per input stream/table </li>
<li> added method <code>#globalKTable()</code> to create a <code>GlobalKTable</code></li>
</ul>
<p> New joins for <code>KStream</code>: </p>
<ul>
<li> added overloads for <code>#join()</code> to join with <code>KTable</code></li>
<li> added overloads for <code>#join()</code> and <code>leftJoin()</code> to join with <code>GlobalKTable</code></li>
<li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x
(cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
</ul>
<p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
<ul>
<li> like all other KTable operations, <code>KTable-KTable</code> joins do not throw an exception on <code>null</code> key records anymore, but drop those records silently </li>
</ul>
<p> New window type <em>Session Windows</em>: </p>
<ul>
<li> added class <code>SessionWindows</code> to specify session windows </li>
<li> added overloads for <code>KGroupedStream</code> methods <code>#count()</code>, <code>#reduce()</code>, and <code>#aggregate()</code>
to allow session window aggregations </li>
</ul>
<p> Changes to <code>TimestampExtractor</code>: </p>
<ul>
<li> method <code>#extract()</code> has a second parameter now </li>
<li> new default timestamp extractor class <code>FailOnInvalidTimestamp</code>
(it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li>
<li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code></li>
</ul>
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
<h3><aid="streams_api_changes_0101"href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
<p> Stream grouping and aggregation split into two methods: </p>
<ul>
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
<li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li>
<li> Example: stream.countByKey() changes to stream.groupByKey().count() </li>
</ul>
<p> Auto Repartitioning: </p>
<ul>
<li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li>
<li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li>
</ul>
<p> TopologyBuilder: </p>
<ul>
<li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
</ul>
<p> DSL: new parameter to specify state store names: </p>
<ul>
<li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li>
<li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li>
<li> KTable#through(String topic) changes to #through(String topic, String storeName) </li>
<li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
</ul>
<p> Windowing: </p>
<ul>
<li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
<li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>