You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
524 lines
35 KiB
524 lines
35 KiB
<!-- |
|
Licensed to the Apache Software Foundation (ASF) under one or more |
|
contributor license agreements. See the NOTICE file distributed with |
|
this work for additional information regarding copyright ownership. |
|
The ASF licenses this file to You under the Apache License, Version 2.0 |
|
(the "License"); you may not use this file except in compliance with |
|
the License. You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License. |
|
--> |
|
|
|
<script><!--#include virtual="../js/templateData.js" --></script> |
|
|
|
<script id="content-template" type="text/x-handlebars-template"> |
|
<h1>Upgrade Guide and API Changes</h1> |
|
<div class="sub-nav-sticky"> |
|
<div class="sticky-top"> |
|
<div style="height:35px"> |
|
<a href="/{{version}}/documentation/streams/">Introduction</a> |
|
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
|
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
|
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
|
<a href="/{{version}}/documentation/streams/architecture">Architecture</a> |
|
<a href="/{{version}}/documentation/streams/developer-guide/">Developer Guide</a> |
|
<a class="active-menu-item" href="/{{version}}/documentation/streams/upgrade-guide">Upgrade</a> |
|
</div> |
|
</div> |
|
</div> |
|
|
|
<p> |
|
If you want to upgrade from 1.1.x to 1.2.0 and you have customized window store implementations on the <code>ReadOnlyWindowStore</code> interface |
|
you'd need to update your code to incorporate the newly added public APIs; otherwise you don't need to make any code changes. |
|
See <a href="#streams_api_changes_120">below</a> for a complete list of 1.2.0 API and semantic changes that allow you to advance your application and/or simplify your code base. |
|
</p> |
|
|
|
<p> |
|
If you want to upgrade from 1.0.x to 1.2.0 and you have customized window store implementations on the <code>ReadOnlyWindowStore</code> interface |
|
you'd need to update your code to incorporate the newly added public APIs. |
|
Otherwise, if you are using Java 7 you don't need to make any code changes as the public API is fully backward compatible; |
|
but if you are using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. |
|
Hot-swaping the jar-file only might not work for this case. |
|
See below a complete list of <a href="#streams_api_changes_120">1.2.0</a> and <a href="#streams_api_changes_110">1.1.0</a> |
|
API and semantic changes that allow you to advance your application and/or simplify your code base. |
|
</p> |
|
|
|
<p> |
|
If you want to upgrade from 0.10.2.x or 0.11.0.x to 1.2.x and you have customized window store implementations on the <code>ReadOnlyWindowStore</code> interface |
|
you'd need to update your code to incorporate the newly added public APIs. |
|
Otherwise, if you are using Java 7 you don't need to do any code changes as the public API is fully backward compatible; |
|
but if you are using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. |
|
However, some public APIs were deprecated and thus it is recommended to update your code eventually to allow for future upgrades. |
|
See below a complete list of <a href="#streams_api_changes_120">1.2</a>, <a href="#streams_api_changes_110">1.1</a>, |
|
<a href="#streams_api_changes_100">1.0</a>, and <a href="#streams_api_changes_0110">0.11.0</a> API |
|
and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features. |
|
Additionally, Streams API 1.1.x requires broker on-disk message format version 0.10 or higher; thus, you need to make sure that the message |
|
format is configured correctly before you upgrade your Kafka Streams application. |
|
</p> |
|
|
|
<p> |
|
If you want to upgrade from 0.10.1.x to 1.2.x see the Upgrade Sections for <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>, |
|
<a href="/{{version}}/documentation/#upgrade_1100_streams"><b>0.11.0</b></a>, |
|
<a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>, |
|
<a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>, and |
|
<a href="/{{version}}/documentation/#upgrade_110_streams"><b>1.2</b></a>. |
|
Note, that a brokers on-disk message format must be on version 0.10 or higher to run a Kafka Streams application version 1.2 or higher. |
|
See below a complete list of <a href="#streams_api_changes_0102">0.10.2</a>, <a href="#streams_api_changes_0110">0.11.0</a>, |
|
<a href="#streams_api_changes_100">1.0</a>, <a href="#streams_api_changes_110">1.1</a>, and <a href="#streams_api_changes_120">1.2</a> |
|
API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features. |
|
</p> |
|
|
|
<p> |
|
Upgrading from 0.10.0.x to 1.2.0 directly is also possible. |
|
Note, that a brokers must be on version 0.10.1 or higher and on-disk message format must be on version 0.10 or higher |
|
to run a Kafka Streams application version 1.2 or higher. |
|
See <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a>, |
|
<a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a>, <a href="#streams_api_changes_100">Streams API changes in 1.0</a>, and |
|
<a href="#streams_api_changes_110">Streams API changes in 1.1</a>, and <a href="#streams_api_changes_120">Streams API changes in 1.2</a> |
|
for a complete list of API changes. |
|
Upgrading to 1.2.0 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase |
|
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). |
|
As an alternative, an offline upgrade is also possible. |
|
</p> |
|
<ul> |
|
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 1.2.0</li> |
|
<li> bounce each instance of your application once </li> |
|
<li> prepare your newly deployed 1.2.0 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li> |
|
<li> bounce each instance of your application once more to complete the upgrade </li> |
|
</ul> |
|
<p> Upgrading from 0.10.0.x to 1.2.0 in offline mode: </p> |
|
<ul> |
|
<li> stop all old (0.10.0.x) application instances </li> |
|
<li> update your code and swap old code and jar file with new code and new jar file </li> |
|
<li> restart all new (1.2.0) application instances </li> |
|
</ul> |
|
|
|
<!-- TODO: verify release verion and update `id` and `href` attributes (also at other places that link to this headline) --> |
|
<h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API changes in 1.2.0</a></h3> |
|
<p> |
|
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying a single window's key-value pair. |
|
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well. |
|
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores">KIP-261</a>. |
|
</p> |
|
|
|
<p> |
|
We have added public <code>WindowedSerdes</code> to allow users to read from / write to a topic storing windowed table changelogs directly. |
|
In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code> |
|
to let users specify inner serdes if the default serde classes are windowed serdes. |
|
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>. |
|
/<p> |
|
|
|
<p> |
|
We have deprecated constructors of <code>KafkaStreams</code> that take a <code>StreamsConfig</code> as parameter. |
|
Please use the other corresponding constructors that accept <code>java.util.Properties</code> instead. |
|
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>. |
|
</p> |
|
<p> |
|
Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>). |
|
To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified. |
|
The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added. |
|
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record. |
|
Forwarding based on child index is not supported in the new API any longer. |
|
</p> |
|
|
|
<h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3> |
|
<p> |
|
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys. |
|
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well. |
|
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+all%28%29+and+range%28%29+API+to+ReadOnlyWindowStore">KIP-205</a>. |
|
</p> |
|
|
|
<p> |
|
There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class. |
|
You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. |
|
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>. |
|
</p> |
|
|
|
<p> |
|
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier">KIP-220</a> |
|
enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients. |
|
You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code> |
|
to distinguish them from configurations of other clients that share the same config names. |
|
</p> |
|
|
|
<p> |
|
New method in <code>GlobalKTable</code> |
|
</p> |
|
<ul> |
|
<li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li> |
|
</ul> |
|
|
|
<p> |
|
New methods in <code>KafkaStreams</code>: |
|
</p> |
|
<ul> |
|
<li> added overload for the constructor that allows overriding the <code>Time</code> object used for tracking system wall-clock time; this is useful for unit testing your application code. </li> |
|
</ul> |
|
|
|
<p> New methods in <code>KafkaClientSupplier</code>: </p> |
|
<ul> |
|
<li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li> |
|
</ul> |
|
|
|
<p>New error handling for exceptions during production:</p> |
|
<ul> |
|
<li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li> |
|
<li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li> |
|
<li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li> |
|
</ul> |
|
|
|
<p> Changes in <code>StreamsResetter</code>: </p> |
|
<ul> |
|
<li> added options to specify input topics offsets to reset according to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a></li> |
|
</ul> |
|
|
|
<h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3> |
|
|
|
<p> |
|
With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use. |
|
This change includes the five main classes <code>KafkaStreams</code>, <code>KStreamBuilder</code>, |
|
<code>KStream</code>, <code>KTable</code>, and <code>TopologyBuilder</code> (and some more others). |
|
All changes are fully backward compatible as old API is only deprecated but not removed. |
|
We recommend to move to the new API as soon as you can. |
|
We will summarize all API changes in the next paragraphs. |
|
</p> |
|
|
|
<p> |
|
The two main classes to specify a topology via the DSL (<code>KStreamBuilder</code>) |
|
or the Processor API (<code>TopologyBuilder</code>) were deprecated and replaced by |
|
<code>StreamsBuilder</code> and <code>Topology</code> (both new classes are located in |
|
package <code>org.apache.kafka.streams</code>). |
|
Note, that <code>StreamsBuilder</code> does not extend <code>Topology</code>, i.e., |
|
the class hierarchy is different now. |
|
The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API. |
|
However, some internal methods that were public in <code>KStreamBuilder</code> |
|
and <code>TopologyBuilder</code> but not part of the actual API are not present |
|
in the new classes any longer. |
|
Furthermore, some overloads were simplified compared to the original classes. |
|
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API">KIP-120</a> |
|
and <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a> |
|
for full details. |
|
</p> |
|
|
|
<p> |
|
Changing how a topology is specified also affects <code>KafkaStreams</code> constructors, |
|
that now only accept a <code>Topology</code>. |
|
Using the DSL builder class <code>StreamsBuilder</code> one can get the constructed |
|
<code>Topology</code> via <code>StreamsBuilder#build()</code>. |
|
Additionally, a new class <code>org.apache.kafka.streams.TopologyDescription</code> |
|
(and some more dependent classes) were added. |
|
Those can be used to get a detailed description of the specified topology |
|
and can be obtained by calling <code>Topology#describe()</code>. |
|
An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>. |
|
</p> |
|
|
|
<p> |
|
New methods in <code>KStream</code>: |
|
</p> |
|
<ul> |
|
<li>With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a> |
|
a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed. |
|
The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted. |
|
</li> |
|
</ul> |
|
|
|
<p> |
|
New methods in <code>KafkaStreams</code>: |
|
</p> |
|
<ul> |
|
<li>retrieve the current runtime information about the local threads via <code>localThreadsMetadata()</code> </li> |
|
<li>observe the restoration of all state stores via <code>setGlobalStateRestoreListener()</code>, in which users can provide their customized implementation of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface</li> |
|
</ul> |
|
|
|
<p> |
|
Deprecated / modified methods in <code>KafkaStreams</code>: |
|
</p> |
|
<ul> |
|
<li> |
|
<code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information. |
|
They have been deprecated in favor of using the new classes/methods <code>localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and |
|
<code>TopologyDescription</code> / <code>Topology#describe()</code> (returning static information). |
|
</li> |
|
<li> |
|
With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a> |
|
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations. |
|
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call. |
|
</li> |
|
<li> |
|
<code>setStateListener()</code> now can only be set before the application start running, i.e. before <code>KafkaStreams.start()</code> is called. |
|
</li> |
|
</ul> |
|
|
|
<p> |
|
Deprecated methods in <code>KGroupedStream</code> |
|
</p> |
|
<ul> |
|
<li> |
|
Windowed aggregations have been deprecated from <code>KGroupedStream</code> and moved to <code>WindowedKStream</code>. |
|
You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>. |
|
</li> |
|
</ul> |
|
|
|
<p> |
|
Modified methods in <code>Processor</code>: |
|
</p> |
|
<ul> |
|
<li> |
|
<p> |
|
The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time. |
|
As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>. |
|
The <code>PunctuationType</code> determines what notion of time is used for the punctuation scheduling: either <a href="/{{version}}/documentation/streams/core-concepts#streams_time">stream time</a> or wall-clock time (by default, <b>stream time</b> is configured to represent event time via <code>TimestampExtractor</code>). |
|
In addition, the <code>punctuate</code> function inside <code>Processor</code> is also deprecated. |
|
</p> |
|
<p> |
|
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data. |
|
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered. |
|
On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time. |
|
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed within 20 seconds, |
|
<code>punctuate</code> would be called 2 times (one time every 10 seconds); |
|
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all. |
|
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method. |
|
</p> |
|
</li> |
|
</ul> |
|
|
|
<p> |
|
If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed: |
|
The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy. |
|
As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0. |
|
Detailed metrics sensor can be found in the <a href="#kafka_streams_monitoring">Streams Monitoring</a> section. |
|
</p> |
|
|
|
<p> |
|
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers">KIP-161</a> |
|
enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application. |
|
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>. |
|
The specified handler must implement the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface. |
|
</p> |
|
|
|
<p> |
|
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs">KIP-173</a> |
|
enables you to provide topic configuration parameters for any topics created by Kafka Streams. |
|
This includes repartition and changelog topics. |
|
You can provide the configs via the <code>StreamsConfig</code> by adding the configs with the prefix as defined by <code>StreamsConfig#topicPrefix(String)</code>. |
|
Any properties in the <code>StreamsConfig</code> with the prefix will be applied when creating internal topics. |
|
Any configs that aren't topic configs will be ignored. |
|
If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config. |
|
</p> |
|
|
|
<h3><a id="streams_api_changes_0110" href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3> |
|
|
|
<p> Updates in <code>StreamsConfig</code>: </p> |
|
<ul> |
|
<li> new configuration parameter <code>processing.guarantee</code> is added </li> |
|
<li> configuration parameter <code>key.serde</code> was deprecated and replaced by <code>default.key.serde</code> </li> |
|
<li> configuration parameter <code>value.serde</code> was deprecated and replaced by <code>default.value.serde</code> </li> |
|
<li> configuration parameter <code>timestamp.extractor</code> was deprecated and replaced by <code>default.timestamp.extractor</code> </li> |
|
<li> method <code>keySerde()</code> was deprecated and replaced by <code>defaultKeySerde()</code> </li> |
|
<li> method <code>valueSerde()</code> was deprecated and replaced by <code>defaultValueSerde()</code> </li> |
|
<li> new method <code>defaultTimestampExtractor()</code> was added </li> |
|
</ul> |
|
|
|
<p> New methods in <code>TopologyBuilder</code>: </p> |
|
<ul> |
|
<li> added overloads for <code>addSource()</code> that allow to define a <code>TimestampExtractor</code> per source node </li> |
|
<li> added overloads for <code>addGlobalStore()</code> that allow to define a <code>TimestampExtractor</code> per source node associated with the global store </li> |
|
</ul> |
|
|
|
<p> New methods in <code>KStreamBuilder</code>: </p> |
|
<ul> |
|
<li> added overloads for <code>stream()</code> that allow to define a <code>TimestampExtractor</code> per input stream </li> |
|
<li> added overloads for <code>table()</code> that allow to define a <code>TimestampExtractor</code> per input table </li> |
|
<li> added overloads for <code>globalKTable()</code> that allow to define a <code>TimestampExtractor</code> per global table </li> |
|
</ul> |
|
|
|
<p> Deprecated methods in <code>KTable</code>: </p> |
|
<ul> |
|
<li> <code>void foreach(final ForeachAction<? super K, ? super V> action)</code> </li> |
|
<li> <code>void print()</code> </li> |
|
<li> <code>void print(final String streamName)</code> </li> |
|
<li> <code>void print(final Serde<K> keySerde, final Serde<V> valSerde)</code> </li> |
|
<li> <code>void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)</code> </li> |
|
<li> <code>void writeAsText(final String filePath)</code> </li> |
|
<li> <code>void writeAsText(final String filePath, final String streamName)</code> </li> |
|
<li> <code>void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)</code> </li> |
|
<li> <code>void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)</code> </li> |
|
</ul> |
|
|
|
<p> |
|
The above methods have been deprecated in favor of using the Interactive Queries API. |
|
If you want to query the current content of the state store backing the KTable, use the following approach: |
|
</p> |
|
<ul> |
|
<li> Make a call to <code>KafkaStreams.store(final String storeName, final QueryableStoreType<T> queryableStoreType)</code> </li> |
|
<li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li> |
|
</ul> |
|
<p> |
|
If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>. |
|
</p> |
|
|
|
<p> Metrics using exactly-once semantics: </p> |
|
<p> |
|
If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model. |
|
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case. |
|
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics. |
|
|
|
</p> |
|
|
|
<p> Producer's <code>client.id</code> naming schema: </p> |
|
<ul> |
|
<li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li> |
|
<li> exactly-once: <code>[client.Id]-StreamThread-[sequence-number]-[taskId]</code> </li> |
|
</ul> |
|
<p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p> |
|
|
|
<h3><a id="streams_api_changes_01021" href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3> |
|
|
|
<p> |
|
Parameter updates in <code>StreamsConfig</code>: |
|
</p> |
|
<ul> |
|
<li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li> |
|
</ul> |
|
|
|
<h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3> |
|
|
|
<p> |
|
New methods in <code>KafkaStreams</code>: |
|
</p> |
|
<ul> |
|
<li> set a listener to react on application state change via <code>setStateListener(StateListener listener)</code> </li> |
|
<li> retrieve the current application state via <code>state()</code> </li> |
|
<li> retrieve the global metrics registry via <code>metrics()</code> </li> |
|
<li> apply a timeout when closing an application via <code>close(long timeout, TimeUnit timeUnit)</code> </li> |
|
<li> specify a custom indent when retrieving Kafka Streams information via <code>toString(String indent)</code> </li> |
|
</ul> |
|
|
|
<p> |
|
Parameter updates in <code>StreamsConfig</code>: |
|
</p> |
|
<ul> |
|
<li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol |
|
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-TopicAdminSchema.1">KIP-4, Section "Topic Admin Schema"</a>) </li> |
|
<li> added many new parameters for metrics, security, and client configurations </li> |
|
</ul> |
|
|
|
<p> Changes in <code>StreamsMetrics</code> interface: </p> |
|
<ul> |
|
<li> removed methods: <code>addLatencySensor()</code> </li> |
|
<li> added methods: <code>addLatencyAndThroughputSensor()</code>, <code>addThroughputSensor()</code>, <code>recordThroughput()</code>, |
|
<code>addSensor()</code>, <code>removeSensor()</code> </li> |
|
</ul> |
|
|
|
<p> New methods in <code>TopologyBuilder</code>: </p> |
|
<ul> |
|
<li> added overloads for <code>addSource()</code> that allow to define a <code>auto.offset.reset</code> policy per source node </li> |
|
<li> added methods <code>addGlobalStore()</code> to add global <code>StateStore</code>s </li> |
|
</ul> |
|
|
|
<p> New methods in <code>KStreamBuilder</code>: </p> |
|
<ul> |
|
<li> added overloads for <code>stream()</code> and <code>table()</code> that allow to define a <code>auto.offset.reset</code> policy per input stream/table </li> |
|
<li> added method <code>globalKTable()</code> to create a <code>GlobalKTable</code> </li> |
|
</ul> |
|
|
|
<p> New joins for <code>KStream</code>: </p> |
|
<ul> |
|
<li> added overloads for <code>join()</code> to join with <code>KTable</code> </li> |
|
<li> added overloads for <code>join()</code> and <code>leftJoin()</code> to join with <code>GlobalKTable</code> </li> |
|
<li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x |
|
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki) |
|
</ul> |
|
|
|
<p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p> |
|
<ul> |
|
<li> like all other KTable operations, <code>KTable-KTable</code> joins do not throw an exception on <code>null</code> key records anymore, but drop those records silently </li> |
|
</ul> |
|
|
|
<p> New window type <em>Session Windows</em>: </p> |
|
<ul> |
|
<li> added class <code>SessionWindows</code> to specify session windows </li> |
|
<li> added overloads for <code>KGroupedStream</code> methods <code>count()</code>, <code>reduce()</code>, and <code>aggregate()</code> |
|
to allow session window aggregations </li> |
|
</ul> |
|
|
|
<p> Changes to <code>TimestampExtractor</code>: </p> |
|
<ul> |
|
<li> method <code>extract()</code> has a second parameter now </li> |
|
<li> new default timestamp extractor class <code>FailOnInvalidTimestamp</code> |
|
(it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li> |
|
<li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code> </li> |
|
</ul> |
|
|
|
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p> |
|
|
|
<h3><a id="streams_api_changes_0101" href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3> |
|
|
|
<p> Stream grouping and aggregation split into two methods: </p> |
|
<ul> |
|
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li> |
|
<li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li> |
|
<li> Example: stream.countByKey() changes to stream.groupByKey().count() </li> |
|
</ul> |
|
|
|
<p> Auto Repartitioning: </p> |
|
<ul> |
|
<li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li> |
|
<li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li> |
|
</ul> |
|
|
|
<p> TopologyBuilder: </p> |
|
<ul> |
|
<li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li> |
|
</ul> |
|
|
|
<p> DSL: new parameter to specify state store names: </p> |
|
<ul> |
|
<li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li> |
|
<li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li> |
|
<li> KTable#through(String topic) changes to #through(String topic, String storeName) </li> |
|
<li> KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter "String storeName"</li> |
|
<li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li> |
|
</ul> |
|
|
|
<p> Windowing: </p> |
|
<ul> |
|
<li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li> |
|
<li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li> |
|
</ul> |
|
|
|
<div class="pagination"> |
|
<a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool" class="pagination__btn pagination__btn__prev">Previous</a> |
|
<a href="#" class="pagination__btn pagination__btn__next pagination__btn--disabled">Next</a> |
|
</div> |
|
</script> |
|
|
|
<!--#include virtual="../../includes/_header.htm" --> |
|
<!--#include virtual="../../includes/_top.htm" --> |
|
<div class="content documentation documentation--current"> |
|
<!--#include virtual="../../includes/_nav.htm" --> |
|
<div class="right"> |
|
<!--#include virtual="../../includes/_docs_banner.htm" --> |
|
<ul class="breadcrumbs"> |
|
<li><a href="/documentation">Documentation</a></li> |
|
<li><a href="/documentation/streams">Kafka Streams</a></li> |
|
</ul> |
|
<div class="p-content"></div> |
|
</div> |
|
</div> |
|
<!--#include virtual="../../includes/_footer.htm" --> |
|
<script> |
|
$(function() { |
|
// Show selected style on nav item |
|
$('.b-nav__streams').addClass('selected'); |
|
|
|
// Display docs subnav items |
|
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
|
}); |
|
</script>
|
|
|