KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs (#9554)
During the AK website upgrade, changes made to kafka-site weren't migrated back to kafka-docs.
This PR is an attempt at porting the streams changes to kafka/docs
For the most part, the bulk of the changes in the PR are cosmetic.
For testing:
I reviewed the PR diffs
Rendered the changes locally
Reviewers: John Roesler <john@confluent.io>
<h3><aid="streams_architecture_tasks"href="#streams_architecture_tasks">Stream Partitions and Tasks</a></h3>
<h3class="anchor-heading"><aid="streams_architecture_tasks"class="anchor-link"></a><ahref="#streams_architecture_tasks">Stream Partitions and Tasks</a></h3>
<p>
The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it.
Kafka Streams allows the user to configure the number of <b>threads</b> that the library can use to parallelize processing within an application instance.
@ -112,7 +112,7 @@
@@ -112,7 +112,7 @@
</p>
<br>
<h3><aid="streams_architecture_state"href="#streams_architecture_state">Local State Stores</a></h3>
<h3class="anchor-heading"><aid="streams_architecture_state"class="anchor-link"></a><ahref="#streams_architecture_state">Local State Stores</a></h3>
<p>
Kafka Streams provides so-called <b>state stores</b>, which can be used by stream processing applications to store and query data,
Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available
<li>A <b>stream</b> is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a <b>data record</b> is defined as a key-value pair.</li>
@ -88,7 +88,7 @@
@@ -88,7 +88,7 @@
At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see <ahref="/{{version}}/documentation/streams/architecture#streams_architecture_tasks"><b>Stream Partitions and Tasks</b></a> for details).
A critical aspect in stream processing is the notion of <b>time</b>, and how it is modeled and integrated.
@ -157,7 +157,7 @@
@@ -157,7 +157,7 @@
</p>
<h3><aid="streams_concepts_duality"href="#streams-concepts-duality">Duality of Streams and Tables</a></h3>
<h3class="anchor-heading"><aid="streams_concepts_duality"class="anchor-link"></a><ahref="#streams_concepts_duality">Duality of Streams and Tables</a></h3>
<p>
When implementing stream processing use cases in practice, you typically need both <strong>streams</strong> and also <strong>databases</strong>.
An example use case that is very common in practice is an e-commerce application that enriches an incoming <em>stream</em> of customer
@ -176,9 +176,9 @@
@@ -176,9 +176,9 @@
or to run <aid="streams-developer-guide-interactive-queries"href="/{{version}}/documentation/streams/developer-guide/interactive-queries#interactive-queries">interactive queries</a>
against your application's latest processing results. And, beyond its internal usage, the Kafka Streams API
also allows developers to exploit this duality in their own applications.
</p>
</p>
<p>
<p>
Before we discuss concepts such as <aid="streams-developer-guide-dsl-aggregating"href="/{{version}}/documentation/streams/developer-guide/dsl-api#aggregating">aggregations</a>
in Kafka Streams, we must first introduce <strong>tables</strong> in more detail, and talk about the aforementioned stream-table duality.
Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. Kafka's log compaction feature, for example, exploits this duality.
In stream processing, one of the most frequently asked question is "does my stream processing system guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing?"
@ -304,7 +304,7 @@
@@ -304,7 +304,7 @@
For more information, see the <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html">Kafka Streams Configs</a> section.
<h2>Step 1: Run the application reset tool<aclass="headerlink"href="#step-1-run-the-application-reset-tool"title="Permalink to this headline"></a></h2>
<p>Invoke the application reset tool from the command line</p>
The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
</p>
<p>
Note: if you set this to <code>Long.MAX_VALUE</code> it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced
@ -390,8 +389,7 @@
@@ -390,8 +389,7 @@
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<preclass="brush: java;">
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
<preclass="line-numbers"><codeclass="language-java"> public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
KafkaProducer<byte[], byte[]> dlqProducer;
String dlqTopic;
@ -415,8 +413,7 @@
@@ -415,8 +413,7 @@
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}
</pre>
}</code></pre>
</div></blockquote>
</div>
@ -427,10 +424,10 @@
@@ -427,10 +424,10 @@
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
that always fails when these exceptions occur.</p>
<p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
<p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
</div>
</div></blockquote>
</div>
@ -629,11 +626,11 @@
@@ -629,11 +626,11 @@
</div>
</blockquote>
</div>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last">If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite><codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code>
instances.</p>
</div>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last">If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite><codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code>
instances.</p>
</div>
<divclass="section"id="num-stream-threads">
<h4><aclass="toc-backref"href="#id11">num.stream.threads</a><aclass="headerlink"href="#num-stream-threads"title="Permalink to this headline"></a></h4>
<blockquote>
@ -664,22 +661,22 @@
@@ -664,22 +661,22 @@
<spanid="streams-developer-guide-processing-guarantee"></span><h4><aclass="toc-backref"href="#id25">processing.guarantee</a><aclass="headerlink"href="#processing-guarantee"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The processing guarantee that should be used.
Possible values are <codeclass="docutils literal"><spanclass="pre">"at_least_once"</span></code> (default),
<li><codeclass="docutils literal"><spanclass="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <codeclass="docutils literal"><spanclass="pre">BloomFilter</span></code>, which is an important optimization.
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setBlockSize(16</span><spanclass="pre">*</span><spanclass="pre">1024L);</span></code> Modify the default <aclass="reference external"href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
</ol>
</dd>
</dl>
</div></blockquote>
</div>
</div>
</blockquote>
</div>
<divclass="section"id="state-dir">
<h4><aclass="toc-backref"href="#id14">state.dir</a><aclass="headerlink"href="#state-dir"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
with the application are created under this subdirectory. When running multiple instances of the same application on a single machine,
this path must be unique for each such instance.</div>
</blockquote>
</div>
<divclass="section"id="topology-optimization">
<h4><aclass="toc-backref"href="#id31">topology.optimization</a><aclass="headerlink"href="#topology-optimization"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
</p>
<p>
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
</p>
</div></blockquote>
</div>
<divclass="section"id="upgrade-from">
<h4><aclass="toc-backref"href="#id14">upgrade.from</a><aclass="headerlink"href="#upgrade-from"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
</div>
</blockquote>
<dlclass="docutils">
<dt>Notes for example:</dt>
<dd><olclass="first last arabic simple">
<li><codeclass="docutils literal"><spanclass="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <codeclass="docutils literal"><spanclass="pre">BloomFilter</span></code>, which is an important optimization.
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setBlockSize(16</span><spanclass="pre">*</span><spanclass="pre">1024L);</span></code> Modify the default <aclass="reference external"href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
<h3><aclass="toc-backref"href="#id16">Kafka consumers, producer and admin client configuration parameters</a><aclass="headerlink"href="#kafka-consumers-and-producer-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>You can specify parameters for the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
and <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance.</p>
<p>In this example, the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<h4><aclass="toc-backref"href="#id14">state.dir</a><aclass="headerlink"href="#state-dir"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
with the application are created under this subdirectory. When running multiple instances of the same application on a single machine,
this path must be unique for each such instance.</div>
</blockquote>
</div>
<divclass="section"id="topology-optimization">
<h4><aclass="toc-backref"href="#id31">topology.optimization</a><aclass="headerlink"href="#topology-optimization"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
</p>
<p>
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
</p>
</div></blockquote>
</div>
<divclass="section"id="upgrade-from">
<h4><aclass="toc-backref"href="#id14">upgrade.from</a><aclass="headerlink"href="#upgrade-from"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
<h3><aclass="toc-backref"href="#id16">Kafka consumers, producer and admin client configuration parameters</a><aclass="headerlink"href="#kafka-consumers-and-producer-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>You can specify parameters for the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
and <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance.</p>
<p>In this example, the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<h4><aclass="toc-backref"href="#id17">Naming</a><aclass="headerlink"href="#naming"title="Permalink to this headline"></a></h4>
<p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <codeclass="docutils literal"><spanclass="pre">send.buffer.bytes</span></code> and
<codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <codeclass="docutils literal"><spanclass="pre">request.timeout.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">retry.backoff.ms</span></code> control retries for client request;
<codeclass="docutils literal"><spanclass="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
You can avoid duplicate names by prefix parameter names with <codeclass="docutils literal"><spanclass="pre">consumer.</span></code>, <codeclass="docutils literal"><spanclass="pre">producer.</span></code>, or <codeclass="docutils literal"><spanclass="pre">admin.</span></code> (e.g., <codeclass="docutils literal"><spanclass="pre">consumer.send.buffer.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">producer.send.buffer.bytes</span></code>).</p>
<h4><aclass="toc-backref"href="#id17">Naming</a><aclass="headerlink"href="#naming"title="Permalink to this headline"></a></h4>
<p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <codeclass="docutils literal"><spanclass="pre">send.buffer.bytes</span></code> and
<codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <codeclass="docutils literal"><spanclass="pre">request.timeout.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">retry.backoff.ms</span></code> control retries for client request;
<codeclass="docutils literal"><spanclass="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
You can avoid duplicate names by prefix parameter names with <codeclass="docutils literal"><spanclass="pre">consumer.</span></code>, <codeclass="docutils literal"><spanclass="pre">producer.</span></code>, or <codeclass="docutils literal"><spanclass="pre">admin.</span></code> (e.g., <codeclass="docutils literal"><spanclass="pre">consumer.send.buffer.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">producer.send.buffer.bytes</span></code>).</p>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
<li><codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
<li><codeclass="docutils literal"><spanclass="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> to set the config.</p>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
<li><codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
<li><codeclass="docutils literal"><spanclass="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> to set the config.</p>
<p> Same applied to <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> and <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
<p> Additionally, to configure the internal repartition/changelog topics, you could use the <codeclass="docutils literal"><spanclass="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
<p> Same applied to <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> and <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
<p> Additionally, to configure the internal repartition/changelog topics, you could use the <codeclass="docutils literal"><spanclass="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
<h3><aclass="toc-backref"href="#id26">Parameters controlled by Kafka Streams</a><aclass="headerlink"href="#parameters-controlled-by-kafka-streams"title="Permalink to this headline"></a></h3>
<p>Kafka Streams assigns the following configuration parameters. If you try to change
<codeclass="docutils literal"><spanclass="pre">allow.auto.create.topics</span></code>, your value
is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
Kafka Streams sets them to different default values than a plain
<spanid="streams-developer-guide-consumer-auto-commit"></span><h4><aclass="toc-backref"href="#id19">enable.auto.commit</a><aclass="headerlink"href="#enable-auto-commit"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
value to <codeclass="docutils literal"><spanclass="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>
</div>
</div>
</div>
<divclass="section"id="default-values">
<h4><aclass="toc-backref"href="#id18">Default Values</a><aclass="headerlink"href="#default-values"title="Permalink to this headline"></a></h4>
<p>Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions
of these configs, see <aclass="reference external"href="http://kafka.apache.org/0100/documentation.html#producerconfigs">Producer Configs</a>
and <aclass="reference external"href="http://kafka.apache.org/0100/documentation.html#newconsumerconfigs">Consumer Configs</a>.</p>
<h3><aclass="toc-backref"href="#id26">Parameters controlled by Kafka Streams</a><aclass="headerlink"href="#parameters-controlled-by-kafka-streams"title="Permalink to this headline"></a></h3>
<p>Kafka Streams assigns the following configuration parameters. If you try to change
<codeclass="docutils literal"><spanclass="pre">allow.auto.create.topics</span></code>, your value
is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
Kafka Streams sets them to different default values than a plain
<spanid="streams-developer-guide-consumer-auto-commit"></span><h4><aclass="toc-backref"href="#id19">enable.auto.commit</a><aclass="headerlink"href="#enable-auto-commit"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
value to <codeclass="docutils literal"><spanclass="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>
<h3><aclass="toc-backref"href="#id21">Recommended configuration parameters for resiliency</a><aclass="headerlink"href="#recommended-configuration-parameters-for-resiliency"title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<p>If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:</p>
<p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
@ -104,12 +101,11 @@
@@ -104,12 +101,11 @@
<h3>Primitive and basic types<aclass="headerlink"href="#primitive-and-basic-types"title="Permalink to this headline"></a></h3>
<p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <codeclass="docutils literal"><spanclass="pre">byte[]</span></code> in
its <codeclass="docutils literal"><spanclass="pre">kafka-clients</span></code> Maven artifact:</p>
<p>This artifact provides the following serde implementations under the package <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
Only the <strong>Kafka Streams DSL</strong> has the notion of a <code>KTable</code>.
@ -172,7 +172,7 @@
@@ -172,7 +172,7 @@
KTable also provides an ability to look up <em>current</em> values of data records by keys. This table-lookup functionality is available through <strong>join operations</strong> (see also <strong>Joining</strong> in the Developer Guide) as well as through <strong>Interactive Queries</strong>.
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()</span><spanclass="cm">/* value serde */</span>
<spanclass="o">);</span>
</pre></div>
<spanclass="o">);</span></code></pre></div>
</div>
<p>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</p>
@ -304,7 +303,7 @@
@@ -304,7 +303,7 @@
<aclass="reference internal"href="../architecture.html#streams_architecture_state"><spanclass="std std-ref">state store</span></a> that backs the table). This is required for
supporting <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> against the table. When a
name is not provided the table will not be queryable and an internal name will be provided for the state store.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* value serde */</span>
<spanclass="o">);</span>
</pre></div>
<spanclass="o">);</span></code></pre></div>
</div>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
@ -384,8 +382,7 @@
@@ -384,8 +382,7 @@
<spanclass="c1">// KStream branches[1] contains all records whose keys start with "B"</span>
<spanclass="c1">// KStream branches[2] contains all other records</span>
<spanclass="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span>
</pre></div>
<spanclass="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span></code></pre></div>
<spanclass="c1">// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</span>
</pre></div>
<spanclass="c1">// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</span></code></pre></div>
</div>
</td>
</tr>
@ -504,7 +497,7 @@
@@ -504,7 +497,7 @@
<em>further processing</em> of the input data (unlike <codeclass="docutils literal"><spanclass="pre">peek</span></code>, which is not a terminal operation).</p>
<p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">())</span><spanclass="cm">/* value (note: type was modified) */</span>
from different streams in the merged stream. Relative order is preserved within each input stream though (ie, records within the same input stream are processed in order)</p>
<code><spanclass="pre">repartition()</span></code> operation always triggers repartitioning of the stream, as a result it can be used with embedded Processor API methods (like <code><spanclass="pre">transform()</span></code> et al.) that do not trigger auto repartitioning when key changing operation is performed beforehand.
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Code below is equivalent to the previous Java 8+ example above.</span>
<spanid="streams-developer-guide-dsl-aggregating"></span><h4><aclass="toc-backref"href="#id12">Aggregating</a><aclass="headerlink"href="#aggregating"title="Permalink to this headline"></a></h4>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">KeyValueStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"aggregated-stream-store"</span><spanclass="cm">/* state store name */</span><spanclass="o">)</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
</pre></div>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span></code></pre></div>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">KeyValueStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"aggregated-table-store"</span><spanclass="cm">/* state store name */</span><spanclass="o">)</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
</pre></div>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span></code></pre></div>
</div>
<divclass="admonition note">
<p><b>Note</b></p>
@ -1815,15 +1785,14 @@
@@ -1815,15 +1785,14 @@
produce a join output <em>for each</em> matching record on the other side, and there can be <em>multiple</em> such matching records
in a given join window (cf. the row with timestamp 15 in the join semantics table below, for example).</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
std-ref">table duals</span></a>. A foreign-key extractor
function is applied to the left record, with a new intermediate
record created and is used to lookup and join with the corresponding
record created and is used to lookup and join with the corresponding
primary key on the right hand side table.
The result is a new KTable that represents the changelog stream
of the join operation.</p>
@ -2516,8 +2478,7 @@
@@ -2516,8 +2478,7 @@
<spanclass="n">KTable</span><spanclass="o"><Long</span><spanclass="o">,</span><spanclass="n">Double</span><spanclass="o">></span><spanclass="n">right</span><spanclass="o">=</span><spanclass="o">...;<br>//This </span><spanclass="o"><spanclass="o"><spanclass="n">foreignKeyExtractor</span></span> simply uses the left-value to map to the right-key.<br></span><spanclass="o"><spanclass="n">Function</span><spanclass="o"><Long</span><spanclass="o">,</span> Long<spanclass="n"></span><spanclass="o">></span><spanclass="n">foreignKeyExtractor</span><spanclass="o">=</span><spanclass="o">(x) -> x;</span><br><br></span><spanclass="c1">// Java 8+ example, using lambda expressions</span>
<pclass="first">The join will be triggered under the
@ -2576,8 +2537,7 @@
@@ -2576,8 +2537,7 @@
<spanclass="n">KTable</span><spanclass="o"><Long</span><spanclass="o">,</span><spanclass="n">Double</span><spanclass="o">></span><spanclass="n">right</span><spanclass="o">=</span><spanclass="o">...;<br>//This </span><spanclass="o"><spanclass="o"><spanclass="n">foreignKeyExtractor</span></span> simply uses the left-value to map to the right-key.<br></span><spanclass="o"><spanclass="n">Function</span><spanclass="o"><Long</span><spanclass="o">,</span> Long<spanclass="n"></span><spanclass="o">></span><spanclass="n">foreignKeyExtractor</span><spanclass="o">=</span><spanclass="o">(x) -> x;</span><br><br></span><spanclass="c1">// Java 8+ example, using lambda expressions</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<p>The <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code> is fully bootstrapped upon (re)start of a <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, which means the table is fully populated with all the data in the underlying topic that is
available at the time of the startup. The actual data processing begins only once the bootstrapping has completed.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<p>Given the previous session window example, here’s what would happen on an input stream of six records.
When the first three records arrive (upper part of in the diagram below), we’d have three sessions (see lower part)
@ -3401,16 +3352,14 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
@@ -3401,16 +3352,14 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Send an email notification when the view count of a page reaches one thousand.</span>
<divclass="highlight-java"><divclass="highlight"><pre><code><span></span><spanclass="c1">// Send an email notification when the view count of a page reaches one thousand.</span>
how output records are distributed across the partitions of the output topic.</p>
<p>Another variant of <codeclass="docutils literal"><spanclass="pre">to</span></code> exists that enables you to dynamically choose which topic to send to for each record via a <codeclass="docutils literal"><spanclass="pre">TopicNameExtractor</span></code>
<li><codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.serialization.Serdes</span></code>: Module that contains all primitive SerDes that can be imported as implicits and a helper to create custom SerDes.</li>
</ul>
<p>The library is cross-built with Scala 2.12 and 2.13. To reference the library compiled against Scala {{scalaVersion}} include the following in your maven <code>pom.xml</code> add the following:</p>
<p>To use the library compiled against Scala 2.12 replace the <codeclass="docutils literal"><spanclass="pre">artifactId</span></code> with <codeclass="docutils literal"><spanclass="pre">kafka-streams-scala_2.12</span></code>.</p>
<p>When using SBT then you can reference the correct library using the following:</p>
<spanid="streams-developer-guide-dsl-sample-usage"></span><h3><aclass="toc-backref"href="#id28">Sample Usage</a><aclass="headerlink"href="#scala-dsl-sample-usage"title="Permalink to this headline"></a></h3>
<p>The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction, but they reside in a different package of the library e.g. the Scala class <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.StreamsBuilder</span></code> is a wrapper around <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.StreamsBuilder</span></code>, <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.kstream.KStream</span></code> is a wrapper around <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.kstream.KStream</span></code>, and so on.</p>
<p>Here's an example of the classic WordCount program that uses the Scala <codeclass="docutils literal"><spanclass="pre">StreamsBuilder</span></code> that builds an instance of <codeclass="docutils literal"><spanclass="pre">KStream</span></code> which is a wrapper around Java <codeclass="docutils literal"><spanclass="pre">KStream</span></code>. Then we reify to a table and get a <codeclass="docutils literal"><spanclass="pre">KTable</span></code>, which, again is a wrapper around Java <codeclass="docutils literal"><spanclass="pre">KTable</span></code>.</p>
<p>The net result is that the following code is structured just like using the Java API, but with Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage is more obvious when given an example. Below is an example WordCount implementation that will be used to demonstrate the differences between the Scala and Java API.</p>
<p>In the above code snippet, we don't have to provide any SerDes, <codeclass="docutils literal"><spanclass="pre">Grouped</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> explicitly. They will also not be dependent on any SerDes specified in the config. <strong>In fact all SerDes specified in the config will be ignored by the Scala APIs</strong>. All SerDes and <codeclass="docutils literal"><spanclass="pre">Grouped</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> will be handled through implicit SerDes as discussed later in the <ahref="#scala-dsl-implicit-serdes">Implicit SerDes</a> section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, <codeclass="docutils literal"><spanclass="pre">Grouped</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> will be flagged as a compile time error.</p>
<p>The library uses the power of <ahref="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <codeclass="docutils literal"><spanclass="pre">Grouped</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Repartitioned</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <codeclass="docutils literal"><spanclass="pre">Grouped</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> available in scope.</p>
<p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p>
<p>Here's an example:</p>
<preclass="brush: scala;">
// DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
<preclass="line-numbers"><codeclass="language-scala">// DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
// that will set up all Grouped, Produced, Consumed and Joined instances.
// So all APIs below that accept Grouped, Produced, Consumed or Joined will
// get these instances automatically
@ -3867,8 +3803,7 @@ val clicksPerRegion: KTable[String, Long] =
@@ -3867,8 +3803,7 @@ val clicksPerRegion: KTable[String, Long] =
<spanid="streams-developer-guide-dsl-scala-dsl-user-defined-serdes"></span><h3><aclass="toc-backref"href="#id30">User-Defined SerDes</a><aclass="headerlink"href="#scala-dsl-user-defined-serdes"title="Permalink to this headline"></a></h3>
<p>When the default primitive SerDes are not enough and we need to define custom SerDes, the usage is exactly the same as above. Just define the implicit SerDes and start building the stream transformation. Here's an example with <codeclass="docutils literal"><spanclass="pre">AvroSerde</span></code>:</p>
<preclass="brush: scala;">
// domain object as a case class
<preclass="line-numbers"><codeclass="language-scala">// domain object as a case class
case class UserClicks(clicks: Long)
// An implicit Serde implementation for the values we want to
@ -3912,8 +3846,7 @@ val clicksPerRegion: KTable[String, Long] =
@@ -3912,8 +3846,7 @@ val clicksPerRegion: KTable[String, Long] =
.reduce(_ + _)
// Write the (continuously updating) results to the output topic.
<p>After the application has started, you can get access to “CountsKeyValueStore” and then query it via the <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java">ReadOnlyKeyValueStore</a> API:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Get the key-value store CountsKeyValueStore</span>
<spanclass="n">System</span><spanclass="o">.</span><spanclass="na">out</span><spanclass="o">.</span><spanclass="na">println</span><spanclass="o">(</span><spanclass="s">"count for "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">key</span><spanclass="o">+</span><spanclass="s">": "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">value</span><spanclass="o">);</span>
<spanclass="o">}</span>
</pre></div>
<spanclass="o">}</span></code></pre></div>
</div>
<p>You can also materialize the results of stateless operators by using the overloaded methods that take a <codeclass="docutils literal"><spanclass="pre">queryableStoreName</span></code>
as shown in the example below:</p>
@ -182,8 +180,7 @@
@@ -182,8 +180,7 @@
<spanclass="c1">// do not materialize the result of filtering corresponding to even numbers</span>
<spanclass="c1">// this means that these results will not be materialized and cannot be queried.</span>
<p>After the application has started, you can get access to “CountsWindowStore” and then query it via the <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java">ReadOnlyWindowStore</a> API:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Get the window store named "CountsWindowStore"</span>
<spanclass="n">System</span><spanclass="o">.</span><spanclass="na">out</span><spanclass="o">.</span><spanclass="na">println</span><spanclass="o">(</span><spanclass="s">"Count of 'world' @ time "</span><spanclass="o">+</span><spanclass="n">windowTimestamp</span><spanclass="o">+</span><spanclass="s">" is "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">value</span><spanclass="o">);</span>
<li>It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.</li>
</ul>
<p>The class/interface hierarchy for your custom store might look something like:</p>
<p>A wrapper class is required because each instance of a Kafka Streams application may run multiple stream tasks and manage
multiple local instances of a particular state store. The wrapper class hides this complexity and lets you query a “logical”
@ -286,7 +279,7 @@
@@ -286,7 +279,7 @@
<codeclass="docutils literal"><spanclass="pre">StateStoreProvider#stores(String</span><spanclass="pre">storeName,</span><spanclass="pre">QueryableStoreType<T></span><spanclass="pre">queryableStoreType)</span></code> returns a <codeclass="docutils literal"><spanclass="pre">List</span></code> of state
stores with the given storeName and of the type as defined by <codeclass="docutils literal"><spanclass="pre">queryableStoreType</span></code>.</p>
<p>Here is an example implementation of the wrapper follows (Java 8+):</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// We strongly recommended implementing a read-only interface</span>
<divclass="highlight-java"><divclass="highlight"><pre><code><span></span><spanclass="c1">// We strongly recommended implementing a read-only interface</span>
<spanclass="c1">// to restrict usage of the store to safe read operations!</span>
<p>This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with
<codeclass="docutils literal"><spanclass="pre">T</span></code> threads and <codeclass="docutils literal"><spanclass="pre">C</span></code> bytes allocated for caching, each thread will have an even <codeclass="docutils literal"><spanclass="pre">C/T</span></code> bytes to construct its own
@ -107,8 +106,7 @@
@@ -107,8 +106,7 @@
<blockquote>
<div><divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Disable record cache</span>
<p>Following from the example first shown in section <aclass="reference internal"href="processor-api.html#streams-developer-guide-state-store"><spanclass="std std-ref">State Stores</span></a>, to disable caching, you can
add the <codeclass="docutils literal"><spanclass="pre">withCachingDisabled</span></code> call (note that caches are enabled by default, however there is an explicit <codeclass="docutils literal"><spanclass="pre">withCachingEnabled</span></code>
<supid="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for "high priority" (aka index and filter) blocks, preventing them from being evicted by data blocks. See the full signature of the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72">LRUCache constructor</a>.
NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
<aclass="reference external"href="https://github.com/facebook/rocksdb/issues/6247">bug in RocksDB</a>, this option cannot be used
if the write buffer memory is also counted against the cache. If you set this to true, you should NOT pass the cache in to the <code>WriteBufferManager</code> and just control the write buffer and cache memory separately.</sup>
NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
<aclass="reference external"href="https://github.com/facebook/rocksdb/issues/6247">bug in RocksDB</a>, this option cannot be used
if the write buffer memory is also counted against the cache. If you set this to true, you should NOT pass the cache in to the <code>WriteBufferManager</code> and just control the write buffer and cache memory separately.</sup>
<br>
<supid="fn2">2. This must be set in order for INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB docs</a></sup>
<spanclass="o">.</span><spanclass="na">withLoggingDisabled</span><spanclass="o">();</span><spanclass="c1">// disable backing up the store to a changelog topic</span>
</pre></div>
<spanclass="o">.</span><spanclass="na">withLoggingDisabled</span><spanclass="o">();</span><spanclass="c1">// disable backing up the store to a changelog topic</span></code></pre></div>
<h2><aclass="toc-backref"href="#id8">Connecting Processors and State Stores</a><aclass="headerlink"href="#connecting-processors-and-state-stores"title="Permalink to this headline"></a></h2>
@ -409,8 +403,7 @@
@@ -409,8 +403,7 @@
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
output data streams out of the topology.</p>
<p>Here is an example implementation:</p>
<preclass="brush: java">
Topology builder = new Topology();
<preclass="line-numbers"><codeclass="language-java"> Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor
@ -419,8 +412,7 @@
@@ -419,8 +412,7 @@
.addStateStore(countStoreBuilder, "Process")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
<p>Here is a quick explanation of this example:</p>
<ulclass="simple">
<li>A source processor node named <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> is added to the topology using the <codeclass="docutils literal"><spanclass="pre">addSource</span></code> method, with one Kafka topic
@ -437,8 +429,7 @@
@@ -437,8 +429,7 @@
This can be done by implementing <codeclass="docutils literal"><spanclass="pre">ConnectedStoreProvider#stores()</span></code> on the <codeclass="docutils literal"><spanclass="pre">ProcessorSupplier</span></code>
instead of calling <codeclass="docutils literal"><spanclass="pre">Topology#addStateStore()</span></code>, like this:
</p>
<preclass="brush: java">
Topology builder = new Topology();
<preclass="line-numbers"><codeclass="language-java"> Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor.
@ -453,8 +444,7 @@
@@ -453,8 +444,7 @@
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
<p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology.
Multiple processors that share a state store may provide the same store with this technique, as long as the <codeclass="docutils literal"><spanclass="pre">StoreBuilder</span></code> is the same <codeclass="docutils literal"><spanclass="pre">instance</span></code>.</p>
<p>In these topologies, the <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> node, and an
<spanid="streams-developer-guide-execution-starting"></span><h2><aclass="toc-backref"href="#id3">Starting a Kafka Streams application</a><aclass="headerlink"href="#starting-a-kafka-streams-application"title="Permalink to this headline"></a></h2>
<p>You can package your Java application as a fat JAR file and then start the application like this:</p>
<divclass="highlight-bash"><divclass="highlight"><pre><span></span><spanclass="c1"># Start the application in class `com.example.MyStreamsApp`</span>
<divclass="highlight-bash"><divclass="highlight"><pre><code><span></span><spanclass="c1"># Start the application in class `com.example.MyStreamsApp`</span>
<spanclass="c1"># from the fat JAR named `path-to-app-fatjar.jar`.</span>
<aclass="reference internal"href="config-streams.html#acceptable-recovery-lag"><spanclass="std std-ref"><code>acceptable.recovery.lag</code></span></a>, if one exists. This means that
most of the time, a task migration will <b>not</b> result in downtime for that task. It will remain active on the instance that's already caught up, while the instance that it's being
migrated to works on restoring the state. Streams will <aclass="reference internal"href="config-streams.html#probing-rebalance-interval-ms"><spanclass="std std-ref">regularly probe</span></a> for warmup tasks that have finished restoring and transition them to active tasks when ready.
</p>
</p>
<p>
Note, the one exception to this task availability is if none of the instances have a caught up version of that task. In that case, we have no choice but to assign the active
task to an instance that is not caught up and will have to block further processing on restoration of the task's state from the changelog. If high availability is important
<p>Configure these settings in the application for your <codeclass="docutils literal"><spanclass="pre">Properties</span></code> instance. These settings will encrypt any
data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the
<p>If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you
start it. For example, if you enter an incorrect password for the <codeclass="docutils literal"><spanclass="pre">ssl.keystore.password</span></code> setting, an error message
similar to this would be logged and then the application would terminate:</p>
<pclass="last">See the section <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a> for more information about Serializers/Deserializers.</p>
</div>
<p>Example <codeclass="docutils literal"><spanclass="pre">pom.xml</span></code> snippet when using Maven:</p>
<h2>Using Kafka Streams within your application code<aclass="headerlink"href="#using-kafka-streams-within-your-application-code"title="Permalink to this headline"></a></h2>
<p>At this point, internal structures are initialized, but the processing is not started yet.
You have to explicitly start the Kafka Streams thread by calling the <codeclass="docutils literal"><spanclass="pre">KafkaStreams#start()</span></code> method:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Start the Kafka Streams threads</span>
<p>If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka
Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
For more information, see <aclass="reference internal"href="../architecture.html#streams_architecture_tasks"><spanclass="std std-ref">Stream Partitions and Tasks</span></a> and <aclass="reference internal"href="../architecture.html#streams_architecture_threads"><spanclass="std std-ref">Threading Model</span></a>.</p>
<p>To catch any unexpected exceptions, you can set an <codeclass="docutils literal"><spanclass="pre">java.lang.Thread.UncaughtExceptionHandler</span></code> before you start the
application. This handler is called whenever a stream thread is terminated by an unexpected exception:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Java 8+, using lambda expressions</span>
<divclass="highlight-java"><divclass="highlight"><pre><code><span></span><spanclass="c1">// Java 8+, using lambda expressions</span>
<spanclass="number">4</span><spanclass="video__text">Transforming Data Pt. 11</span>
<spanclass="video-number">4</span><spanclass="video__text">Transforming Data Pt. 11</span>
</p>
</div>
</div>
@ -103,7 +103,7 @@
@@ -103,7 +103,7 @@
<pclass="grid__item__customer__description extra__space">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing
<ahref="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/"target='blank'> event streams</a> enables our technical team to do near-real time business intelligence.
This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist
of the <code><ahref="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java">WordCountDemo</a></code> example code (converted to use Java 8 lambda expressions for easy reading).
</p>
<preclass="brush: java;">
// Serializers/deserializers (serde) for String and Long types
<preclass="line-numbers"><codeclass="language-java">// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
@ -94,10 +92,8 @@ because it cannot know when it has processed "all" the input data.
@@ -94,10 +92,8 @@ because it cannot know when it has processed "all" the input data.
<ahref="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz"title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here:
<preclass="brush: bash;">
> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
> cd kafka_{{scalaVersion}}-{{fullDotVersion}}
</pre>
<preclass="line-numbers"><codeclass="language-bash">> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
> cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
<h4><aid="quickstart_streams_startserver"href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
@ -105,79 +101,63 @@ Note that there are multiple downloadable Scala versions and we choose to use th
@@ -105,79 +101,63 @@ Note that there are multiple downloadable Scala versions and we choose to use th
Kafka uses <ahref="https://zookeeper.apache.org/">ZooKeeper</a> so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
</pre>
...</code></pre>
<h4><aid="quickstart_streams_prepare"href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4>
<!--
<preclass="brush: bash;">
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
</pre>
<preclass="line-numbers"><codeclass="language-bash">> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt</code></pre>
Or on Windows:
<preclass="brush: bash;">
> echo all streams lead to kafka> file-input.txt
<preclass="line-numbers"><codeclass="language-bash">> echo all streams lead to kafka> file-input.txt
The demo application will read from the input topic <b>streams-plaintext-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
@ -187,22 +167,18 @@ Hence there won't be any STDOUT output except log entries as the results are wri
@@ -187,22 +167,18 @@ Hence there won't be any STDOUT output except log entries as the results are wri
Now we can start the console producer in a separate terminal to write some input data to this topic:
<h4><aid="quickstart_streams_process"href="#quickstart_streams_process">Step 5: Process some data</a></h4>
@ -211,17 +187,14 @@ Now let's write some message with the console producer into the input topic <b>s
@@ -211,17 +187,14 @@ Now let's write some message with the console producer into the input topic <b>s
This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer:
@ -234,8 +207,7 @@ all 1
@@ -234,8 +207,7 @@ all 1
streams 1
lead 1
to 1
kafka 1
</pre>
kafka 1</code></pre>
<p>
Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count.
@ -245,16 +217,13 @@ Now let's continue writing one more message with the console producer into the i
@@ -245,16 +217,13 @@ Now let's continue writing one more message with the console producer into the i
Enter the text line "hello kafka streams" and hit <RETURN>.
@ -270,26 +239,22 @@ to 1
@@ -270,26 +239,22 @@ to 1
kafka 1
hello 1
kafka 2
streams 2
</pre>
streams 2</code></pre>
Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been incremented from <b>1</b> to <b>2</b>.
Whenever you write further input messages to the input topic, you will observe new messages being added to the <b>streams-wordcount-output</b> topic,
representing the most recent word counts as computed by the WordCount application.
Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic <b>streams-plaintext-input</b> before we wrap up this quickstart:
As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is
an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
@ -352,10 +316,10 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing h
@@ -352,10 +316,10 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing h
It is highly recommended to read the <ahref="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
</p>
<h4><aid="tutorial_maven_setup"href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
<h4class="anchor-heading"><aid="tutorial_maven_setup"class="anchor-link"></a><ahref="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
<p>
We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
Assuming the above parameter values are used, this command will create a project structure that looks like this:
</p>
<preclass="brush: bash;">
> tree streams.examples
<preclass="line-numbers"><codeclass="language-bash">> tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
@ -70,8 +67,7 @@
@@ -70,8 +67,7 @@
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties
</pre>
|-- log4j.properties</code></pre>
<p>
The <code>pom.xml</code> file included in the project already has the Streams dependency defined.
@ -83,26 +79,22 @@
@@ -83,26 +79,22 @@
Since we are going to start writing such programs from scratch, we can now delete these examples:
</p>
<preclass="brush: bash;">
> cd streams-quickstart
> rm src/main/java/myapps/*.java
</pre>
<preclass="line-numbers"><codeclass="language-bash">> cd streams-quickstart
> rm src/main/java/myapps/*.java</code></pre>
<h4><aid="tutorial_code_pipe"href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java/myapps</code>.
public static void main(String[] args) throws Exception {
}
}
</pre>
}</code></pre>
<p>
We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
@ -115,20 +107,16 @@
@@ -115,20 +107,16 @@
and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
</p>
<preclass="brush: java;">
Properties props = new Properties();
<preclass="line-numbers"><codeclass="language-java"> Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
</pre>
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092</code></pre>
<p>
In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
As shown above, it illustrates that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
@ -215,9 +189,7 @@
@@ -215,9 +189,7 @@
we can now construct the Streams client with the two components we have just constructed above: the configuration map specified in a <code>java.util.Properties</code> instance and the <code>Topology</code> object.
</p>
<preclass="brush: java;">
final KafkaStreams streams = new KafkaStreams(topology, props);
</pre>
<preclass="line-numbers"><codeclass="language-java"> final KafkaStreams streams = new KafkaStreams(topology, props);</code></pre>
<p>
By calling its <code>start()</code> function we can trigger the execution of this client.
@ -225,8 +197,7 @@
@@ -225,8 +197,7 @@
We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
</p>
<preclass="brush: java;">
final CountDownLatch latch = new CountDownLatch(1);
<preclass="line-numbers"><codeclass="language-java"> final CountDownLatch latch = new CountDownLatch(1);
As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
@ -411,8 +365,7 @@
@@ -411,8 +365,7 @@
The complete code looks like this (assuming lambda expression is used):
In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
Note that you need brokers with version 2.5 or newer to use this feature.
</p>
<p>
For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances
that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out),
@ -135,12 +134,10 @@
@@ -135,12 +134,10 @@
tasks to their new owners in the background. Check out <ahref="https://cwiki.apache.org/confluence/x/0i4lBg">KIP-441</a>
for full details, including several new configs for control over this new feature.
</p>
<p>
New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s)
and end/terminal node(s) of a task. See <ahref="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
</p>
<p>
As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in favor of the new <code>KStream.repartition()</code> operator
(as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
@ -160,7 +157,7 @@
@@ -160,7 +157,7 @@
as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-571%3A+Add+option+to+force+remove+members+in+StreamsResetter">KIP-571</a>.
</p>
<h3><aid="streams_api_changes_250"href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_250"class="anchor-link"></a><ahref="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>)
that allows to aggregate multiple streams in a single operation.
<h3><aid="streams_api_changes_240"href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_240"class="anchor-link"></a><ahref="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
<p>
As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable">KIP-213</a>).
This joiner allows for records to be joined between two KTables with different keys.
@ -277,7 +274,7 @@
@@ -277,7 +274,7 @@
Hence, you will need to reset your application to upgrade it.
<h3><aid="streams_api_changes_230"href="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_230"class="anchor-link"></a><ahref="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
<p>Version 2.3.0 adds the Suppress operator to the <code>kafka-streams-scala</code> Ktable API.</p>
@ -346,13 +343,13 @@
@@ -346,13 +343,13 @@
For more details please read <ahref="https://issues.apache.org/jira/browse/KAFKA-8215">KAFKA-8215</a>.
</p>
<h3><aid="streams_notable_changes_221"href="#streams_api_changes_221">Notable changes in Kafka Streams 2.2.1</a></h3>
<h3class="anchor-heading"><aid="streams_notable_changes_221"class="anchor-link"></a><ahref="#streams_notable_changes_221">Notable changes in Kafka Streams 2.2.1</a></h3>
<p>
As of Kafka Streams 2.2.1 a message format 0.11 or higher is required;
this implies that brokers must be on version 0.11.0 or higher.
</p>
<h3><aid="streams_api_changes_220"href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_220"class="anchor-link"></a><ahref="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
<p>
We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first
stream task assignment, and then back to <code>RUNNING</code>; starting in 2.2.0 it will transit from <code>CREATED</code> directly to <code>REBALANCING</code> and then to <code>RUNNING</code>.
@ -369,7 +366,7 @@
@@ -369,7 +366,7 @@
used in a try-with-resource statement. For a full list of public interfaces that get impacted please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement">KIP-376</a>.
</p>
<h3><aid="streams_api_changes_210"href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_210"class="anchor-link"></a><ahref="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
<p>
We updated <code>TopologyDescription</code> API to allow for better runtime checking.
Users are encouraged to use <code>#topicSet()</code> and <code>#topicPattern()</code> accordingly on <code>TopologyDescription.Source</code> nodes,
@ -469,7 +466,7 @@
@@ -469,7 +466,7 @@
different stream instances in one application.
</p>
<h3><aid="streams_api_changes_200"href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_200"class="anchor-link"></a><ahref="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>
In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <ahref="#streams_api_changes_200">Streams API changes</a> below).
If you have customized window store implementations that extends the <code>ReadOnlyWindowStore</code> interface you need to make code changes.
@ -605,7 +602,7 @@
@@ -605,7 +602,7 @@
<li><code>StreamsConfig#ZOOKEEPER_CONNECT_CONFIG</code> are removed as we do not need ZooKeeper dependency in Streams any more (it is deprecated since 0.10.2.0). </li>
</ul>
<h3><aid="streams_api_changes_110"href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_110"class="anchor-link"></a><ahref="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
<p>
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
@ -663,7 +660,7 @@
@@ -663,7 +660,7 @@
<li> added options to specify input topics offsets to reset according to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a></li>
</ul>
<h3><aid="streams_api_changes_100"href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_100"class="anchor-link"></a><ahref="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
<p>
With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use.
@ -797,7 +794,7 @@
@@ -797,7 +794,7 @@
If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config.
</p>
<h3><aid="streams_api_changes_0110"href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_0110"class="anchor-link"></a><ahref="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
<p> Updates in <code>StreamsConfig</code>: </p>
<ul>
@ -866,7 +863,7 @@
@@ -866,7 +863,7 @@
</ul>
<p><code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
<h3><aid="streams_api_changes_01021"href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_01021"class="anchor-link"></a><ahref="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
<p>
Parameter updates in <code>StreamsConfig</code>:
@ -875,7 +872,7 @@
@@ -875,7 +872,7 @@
<li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
</ul>
<h3><aid="streams_api_changes_0102"href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_0102"class="anchor-link"></a><ahref="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<p>
New methods in <code>KafkaStreams</code>:
@ -946,7 +943,7 @@
@@ -946,7 +943,7 @@
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
<h3><aid="streams_api_changes_0101"href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
<h3class="anchor-heading"><aid="streams_api_changes_0101"class="anchor-link"></a><ahref="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
<p> Stream grouping and aggregation split into two methods: </p>