You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
469 lines
52 KiB
469 lines
52 KiB
<!-- |
|
Licensed to the Apache Software Foundation (ASF) under one or more |
|
contributor license agreements. See the NOTICE file distributed with |
|
this work for additional information regarding copyright ownership. |
|
The ASF licenses this file to You under the Apache License, Version 2.0 |
|
(the "License"); you may not use this file except in compliance with |
|
the License. You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License. |
|
--> |
|
|
|
<script><!--#include virtual="../../js/templateData.js" --></script> |
|
|
|
<script id="content-template" type="text/x-handlebars-template"> |
|
<!-- h1>Developer Guide for Kafka Streams</h1 --> |
|
<div class="sub-nav-sticky"> |
|
<div class="sticky-top"> |
|
<!-- div style="height:35px"> |
|
<a href="/{{version}}/documentation/streams/">Introduction</a> |
|
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> |
|
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
|
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
|
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
|
</div --> |
|
</div> |
|
</div> |
|
|
|
<div class="section" id="processor-api"> |
|
<span id="streams-developer-guide-processor-api"></span><h1>Processor API<a class="headerlink" href="#processor-api" title="Permalink to this headline"></a></h1> |
|
<p>The Processor API allows developers to define and connect custom processors and to interact with state stores. With the |
|
Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these |
|
processors with their associated state stores to compose the processor topology that represents a customized processing |
|
logic.</p> |
|
<div class="contents local topic" id="table-of-contents"> |
|
<p class="topic-title first"><b>Table of Contents</b></p> |
|
<ul class="simple"> |
|
<li><a class="reference internal" href="#overview" id="id1">Overview</a></li> |
|
<li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream |
|
Processor</a></li> |
|
<li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li> |
|
<li><a class="reference internal" href="#state-stores" id="id3">State Stores</a> |
|
<ul> |
|
<li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li> |
|
<li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li> |
|
<li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li> |
|
<li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li> |
|
</ul> |
|
</li> |
|
<li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li> |
|
<li><a class="reference internal" href="#accessing-processor-context" id="id10">Accessing Processor Context</a></li> |
|
</ul> |
|
</div> |
|
<div class="section" id="overview"> |
|
<h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2> |
|
<p>The Processor API can be used to implement both <strong>stateless</strong> as well as <strong>stateful</strong> operations, where the latter is |
|
achieved through the use of <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a>.</p> |
|
<div class="admonition tip"> |
|
<p><b>Tip</b></p> |
|
<p class="last"><strong>Combining the DSL and the Processor API:</strong> |
|
You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the |
|
section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p> |
|
</div> |
|
<p>For a complete list of available API functionality, see the <a href="../../../javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p> |
|
</div> |
|
<div class="section" id="defining-a-stream-processor"> |
|
<span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2> |
|
<p>A <a class="reference internal" href="../concepts.html#streams-concepts"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step. |
|
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect |
|
these processors with their associated state stores to compose the processor topology.</p> |
|
<p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method. |
|
The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p> |
|
<p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction |
|
phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code> |
|
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, |
|
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation |
|
function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), |
|
and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>). |
|
Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the |
|
<code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single |
|
<code class="docutils literal"><span class="pre">Processor</span></code> object by calling |
|
<code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> |
|
<p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: |
|
(1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp. |
|
(2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). |
|
Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p> |
|
<p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code> |
|
API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used |
|
for the punctuation scheduling: either <a class="reference internal" href="../concepts.html#streams-concepts-time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time |
|
is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely |
|
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there |
|
is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p> |
|
<p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you |
|
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), |
|
then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code> |
|
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p> |
|
<p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time. |
|
Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these |
|
60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records |
|
were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code> |
|
callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple |
|
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> |
|
<div class="admonition attention"> |
|
<p class="first admonition-title"><b>Attention</b></p> |
|
<p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. |
|
If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. |
|
This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p> |
|
</div> |
|
<p><b>Example</b></p> |
|
<p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p> |
|
<ul class="simple"> |
|
<li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li> |
|
<li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> |
|
<li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li> |
|
</ul> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountProcessor</span> <span class="kd">implements</span> <span class="n">Processor</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> |
|
|
|
<span class="kd">private</span> <span class="n">ProcessorContext</span> <span class="n">context</span><span class="o">;</span> |
|
<span class="kd">private</span> <span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">kvStore</span><span class="o">;</span> |
|
|
|
<span class="nd">@Override</span> |
|
<span class="nd">@SuppressWarnings</span><span class="o">(</span><span class="s">"unchecked"</span><span class="o">)</span> |
|
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">ProcessorContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span> |
|
<span class="c1">// keep the processor context locally because we need it in punctuate() and commit()</span> |
|
<span class="k">this</span><span class="o">.</span><span class="na">context</span> <span class="o">=</span> <span class="n">context</span><span class="o">;</span> |
|
|
|
<span class="c1">// retrieve the key-value store named "Counts"</span> |
|
<span class="n">kvStore</span> <span class="o">=</span> <span class="o">(</span><span class="n">KeyValueStore</span><span class="o">)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStateStore</span><span class="o">(</span><span class="s">"Counts"</span><span class="o">);</span> |
|
|
|
<span class="c1">// schedule a punctuate() method every 1000 milliseconds based on stream-time</span> |
|
<span class="k">this</span><span class="o">.</span><span class="na">context</span><span class="o">.</span><span class="na">schedule</span><span class="o">(</span><span class="mi">1000</span><span class="o">,</span> <span class="n">PunctuationType</span><span class="o">.</span><span class="na">STREAM_TIME</span><span class="o">,</span> <span class="o">(</span><span class="n">timestamp</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> |
|
<span class="n">KeyValueIterator</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">iter</span> <span class="o">=</span> <span class="k">this</span><span class="o">.</span><span class="na">kvStore</span><span class="o">.</span><span class="na">all</span><span class="o">();</span> |
|
<span class="k">while</span> <span class="o">(</span><span class="n">iter</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span> |
|
<span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">entry</span> <span class="o">=</span> <span class="n">iter</span><span class="o">.</span><span class="na">next</span><span class="o">();</span> |
|
<span class="n">context</span><span class="o">.</span><span class="na">forward</span><span class="o">(</span><span class="n">entry</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="n">entry</span><span class="o">.</span><span class="na">value</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span> |
|
<span class="o">}</span> |
|
<span class="n">iter</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> |
|
|
|
<span class="c1">// commit the current processing progress</span> |
|
<span class="n">context</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span> |
|
<span class="o">});</span> |
|
<span class="o">}</span> |
|
|
|
<span class="nd">@Override</span> |
|
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">punctuate</span><span class="o">(</span><span class="kt">long</span> <span class="n">timestamp</span><span class="o">)</span> <span class="o">{</span> |
|
<span class="c1">// this method is deprecated and should not be used anymore</span> |
|
<span class="o">}</span> |
|
|
|
<span class="nd">@Override</span> |
|
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="o">{</span> |
|
<span class="c1">// close any resources managed by this processor</span> |
|
<span class="c1">// Note: Do not close any StateStores as these are managed by the library</span> |
|
<span class="o">}</span> |
|
|
|
<span class="o">}</span> |
|
</pre></div> |
|
</div> |
|
<div class="admonition note"> |
|
<p><b>Note</b></p> |
|
<p class="last"><strong>Stateful processing with state stores:</strong> |
|
The <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> defined above can access the currently received record in its <code class="docutils literal"><span class="pre">process()</span></code> method, and it can |
|
leverage <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> to maintain processing states to, for example, remember recently |
|
arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p> |
|
</div> |
|
</div> |
|
<div class="section" id="unit-testing-processors"> |
|
<h2> |
|
<a class="toc-backref" href="#id9">Unit Testing Processors</a> |
|
<a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a> |
|
</h2> |
|
<p> |
|
Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your |
|
processors <a href="testing.html#unit-testing-processors">here</a>. |
|
</p> |
|
</div> |
|
<div class="section" id="state-stores"> |
|
<span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2> |
|
<p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor |
|
or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember |
|
recently received input records, to track rolling aggregates, to de-duplicate input records, and more. |
|
Another feature of state stores is that they can be |
|
<a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a |
|
NodeJS-based dashboard or a microservice implemented in Scala or Go.</p> |
|
<p>The |
|
<a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">available state store types</span></a> in Kafka Streams have |
|
<a class="reference internal" href="#streams-developer-guide-state-store-fault-tolerance"><span class="std std-ref">fault tolerance</span></a> enabled by default.</p> |
|
<div class="section" id="defining-and-creating-a-state-store"> |
|
<span id="streams-developer-guide-state-store-defining"></span><h3><a class="toc-backref" href="#id4">Defining and creating a State Store</a><a class="headerlink" href="#defining-and-creating-a-state-store" title="Permalink to this headline"></a></h3> |
|
<p>You can either use one of the available store types or |
|
<a class="reference internal" href="#streams-developer-guide-state-store-custom"><span class="std std-ref">implement your own custom store type</span></a>. |
|
It’s common practice to leverage an existing store type via the <code class="docutils literal"><span class="pre">Stores</span></code> factory.</p> |
|
<p>Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code. |
|
Rather, you define state stores indirectly by creating a so-called <code class="docutils literal"><span class="pre">StoreBuilder</span></code>. This buildeer is used by |
|
Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where |
|
needed.</p> |
|
<p>The following store types are available out of the box.</p> |
|
<table border="1" class="non-scrolling-table width-100-percent docutils"> |
|
<colgroup> |
|
<col width="19%" /> |
|
<col width="11%" /> |
|
<col width="18%" /> |
|
<col width="51%" /> |
|
</colgroup> |
|
<thead valign="bottom"> |
|
<tr class="row-odd"><th class="head">Store Type</th> |
|
<th class="head">Storage Engine</th> |
|
<th class="head">Fault-tolerant?</th> |
|
<th class="head">Description</th> |
|
</tr> |
|
</thead> |
|
<tbody valign="top"> |
|
<tr class="row-even"><td>Persistent |
|
<code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
|
<td>RocksDB</td> |
|
<td>Yes (enabled by default)</td> |
|
<td><ul class="first simple"> |
|
<li><strong>The recommended store type for most use cases.</strong></li> |
|
<li>Stores its data on local disk.</li> |
|
<li>Storage capacity: |
|
managed local state can be larger than the memory (heap space) of an |
|
application instance, but must fit into the available local disk |
|
space.</li> |
|
<li>RocksDB settings can be fine-tuned, see |
|
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li> |
|
<li>Available <a class="reference external" href="../../../javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>: |
|
time window key-value store, session window key-value store.</li> |
|
</ul> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating a persistent key-value store:</span> |
|
<span class="c1">// here, we create a `KeyValueStore<String, Long>` named "persistent-counts".</span> |
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
|
|
|
<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span> |
|
<span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> |
|
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
|
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">"persistent-counts"</span><span class="o">),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span> |
|
<span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span> |
|
</pre></div> |
|
</div> |
|
</td> |
|
</tr> |
|
<tr class="row-odd"><td>In-memory |
|
<code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
|
<td>-</td> |
|
<td>Yes (enabled by default)</td> |
|
<td><ul class="first simple"> |
|
<li>Stores its data in memory.</li> |
|
<li>Storage capacity: |
|
managed local state must fit into memory (heap space) of an |
|
application instance.</li> |
|
<li>Useful when application instances run in an environment where local |
|
disk space is either not available or local disk space is wiped |
|
in-between app instance restarts.</li> |
|
</ul> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating an in-memory key-value store:</span> |
|
<span class="c1">// here, we create a `KeyValueStore<String, Long>` named "inmemory-counts".</span> |
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
|
|
|
<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span> |
|
<span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> |
|
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
|
<span class="n">Stores</span><span class="o">.</span><span class="na">inMemoryKeyValueStore</span><span class="o">(</span><span class="s">"inmemory-counts"</span><span class="o">),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span> |
|
<span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span> |
|
</pre></div> |
|
</div> |
|
</td> |
|
</tr> |
|
</tbody> |
|
</table> |
|
</div> |
|
<div class="section" id="fault-tolerant-state-stores"> |
|
<span id="streams-developer-guide-state-store-fault-tolerance"></span><h3><a class="toc-backref" href="#id5">Fault-tolerant State Stores</a><a class="headerlink" href="#fault-tolerant-state-stores" title="Permalink to this headline"></a></h3> |
|
<p>To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be |
|
continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one |
|
machine to another when <a class="reference internal" href="running-app.html#streams-developer-guide-execution-scaling"><span class="std std-ref">elastically adding or removing capacity from your application</span></a>. |
|
This topic is sometimes referred to as the state store’s associated <em>changelog topic</em>, or its <em>changelog</em>. For example, if |
|
you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can |
|
<a class="reference internal" href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><span class="std std-ref">enable or disable this backup feature</span></a> for a |
|
state store.</p> |
|
<p>By default, persistent key-value stores are fault-tolerant. They are backed by a |
|
<a class="reference external" href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. The purpose of compacting this |
|
topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, |
|
and to minimize recovery time if a state store needs to be restored from its changelog topic.</p> |
|
<p>Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and |
|
deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of |
|
deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are |
|
composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not |
|
be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion |
|
enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The |
|
default retention setting is <code class="docutils literal"><span class="pre">Windows#maintainMs()</span></code> + 1 day. You can override this setting by specifying |
|
<code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code> in the <code class="docutils literal"><span class="pre">StreamsConfig</span></code>.</p> |
|
<p>When you open an <code class="docutils literal"><span class="pre">Iterator</span></code> from a state store you must call <code class="docutils literal"><span class="pre">close()</span></code> on the iterator when you are done working with |
|
it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, |
|
you may encounter an OOM error.</p> |
|
</div> |
|
<div class="section" id="enable-or-disable-fault-tolerance-of-state-stores-store-changelogs"> |
|
<span id="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><a class="toc-backref" href="#id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a><a class="headerlink" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" title="Permalink to this headline"></a></h3> |
|
<p>You can enable or disable fault tolerance for a state store by enabling or disabling the change logging |
|
of the store through <code class="docutils literal"><span class="pre">enableLogging()</span></code> and <code class="docutils literal"><span class="pre">disableLogging()</span></code>. |
|
You can also fine-tune the associated topic’s configuration if needed.</p> |
|
<p>Example for disabling fault-tolerance:</p> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
|
|
|
<span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
|
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">"Counts"</span><span class="o">),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> |
|
<span class="o">.</span><span class="na">withLoggingDisabled</span><span class="o">();</span> <span class="c1">// disable backing up the store to a changelog topic</span> |
|
</pre></div> |
|
</div> |
|
<div class="admonition attention"> |
|
<p class="first admonition-title">Attention</p> |
|
<p class="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any <a class="reference internal" href="config-streams.html#streams-developer-guide-standby-replicas"><span class="std std-ref">standby replicas</span></a>.</p> |
|
</div> |
|
<p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration: |
|
You can add any log config from <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L61">kafka.log.LogConfig</a>. |
|
Unrecognized configs will be ignored.</p> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
|
|
|
<span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">changelogConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o">();</span> |
|
<span class="c1">// override min.insync.replicas</span> |
|
<span class="n">changelogConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"min.insyc.replicas"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span> |
|
|
|
<span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
|
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">"Counts"</span><span class="o">),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
|
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> |
|
<span class="o">.</span><span class="na">withLoggingEnabled</span><span class="o">(</span><span class="n">changlogConfig</span><span class="o">);</span> <span class="c1">// enable changelogging, with custom changelog settings</span> |
|
</pre></div> |
|
</div> |
|
</div> |
|
<div class="section" id="implementing-custom-state-stores"> |
|
<span id="streams-developer-guide-state-store-custom"></span><h3><a class="toc-backref" href="#id7">Implementing Custom State Stores</a><a class="headerlink" href="#implementing-custom-state-stores" title="Permalink to this headline"></a></h3> |
|
<p>You can use the <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">built-in state store types</span></a> or implement your own. |
|
The primary interface to implement for the store is |
|
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such |
|
as <code class="docutils literal"><span class="pre">KeyValueStore</span></code>.</p> |
|
<p>You also need to provide a “builder” for the store by implementing the |
|
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of |
|
your store.</p> |
|
</div> |
|
</div> |
|
<div class="section" id="accessing-processor-context"> |
|
<h2><a class="toc-backref" href="#id10">Accessing Processor Context</a><a class="headerlink" href="#accessing-processor-context" title="Permalink to this headline"></a></h2> |
|
<p>As we have mentioned in <href=#defining-a-stream-processor>Defining a Stream Processor<\href>, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p> |
|
<p>This object can also be used to access the metadata related with the application like |
|
<code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>, |
|
and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>, |
|
<code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and |
|
<code class="docutils literal"><span class="pre">headers</span></code>.</p> |
|
<p>Here is an example implementation of how to add a new header to the record:</p> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">public void process(String key, String value) {</span> |
|
|
|
<span class="c1">// add a heaeder to the elements</span> |
|
<span class="n">context()</span><span class="o">.</span><span class="na">headers</span><span class="o">()</span><span class="o">.</span><span class="na">add</span><span class="o">.</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"key"</span> |
|
<span class="o">}</span> |
|
</pre></div> |
|
</div> |
|
<div class="section" id="connecting-processors-and-state-stores"> |
|
<h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2> |
|
<p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the |
|
state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by |
|
using the <code class="docutils literal"><span class="pre">Topology</span></code> instance. In addition, you can add source processors with the specified Kafka topics |
|
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate |
|
output data streams out of the topology.</p> |
|
<p>Here is an example implementation:</p> |
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span> |
|
|
|
<span class="c1">// add the source processor node that takes Kafka topic "source-topic" as input</span> |
|
<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">"Source"</span><span class="o">,</span> <span class="s">"source-topic"</span><span class="o">)</span> |
|
|
|
<span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor</span> |
|
<span class="o">.</span><span class="na">addProcessor</span><span class="o">(</span><span class="s">"Process"</span><span class="o">,</span> <span class="o">()</span> <span class="o">-></span> <span class="k">new</span> <span class="n">WordCountProcessor</span><span class="o">(),</span> <span class="s">"Source"</span><span class="o">)</span> |
|
|
|
<span class="c1">// add the count store associated with the WordCountProcessor processor</span> |
|
<span class="o">.</span><span class="na">addStateStore</span><span class="o">(</span><span class="n">countStoreBuilder</span><span class="o">,</span> <span class="s">"Process"</span><span class="o">)</span> |
|
|
|
<span class="c1">// add the sink processor node that takes Kafka topic "sink-topic" as output</span> |
|
<span class="c1">// and the WordCountProcessor node as its upstream processor</span> |
|
<span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">"Sink"</span><span class="o">,</span> <span class="s">"sink-topic"</span><span class="o">,</span> <span class="s">"Process"</span><span class="o">);</span> |
|
</pre></div> |
|
</div> |
|
<p>Here is a quick explanation of this example:</p> |
|
<ul class="simple"> |
|
<li>A source processor node named <code class="docutils literal"><span class="pre">"Source"</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic |
|
<code class="docutils literal"><span class="pre">"source-topic"</span></code> fed to it.</li> |
|
<li>A processor node named <code class="docutils literal"><span class="pre">"Process"</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream |
|
processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li> |
|
<li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">"Process"</span></code> node, using |
|
<code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li> |
|
<li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">"Process"</span></code> node |
|
as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code> |
|
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li> |
|
</ul> |
|
<p>In this topology, the <code class="docutils literal"><span class="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node, and an |
|
upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from |
|
Kafka to its downstream <code class="docutils literal"><span class="pre">"Process"</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and |
|
update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the |
|
<code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to |
|
the Kafka topic <code class="docutils literal"><span class="pre">"sink-topic"</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the |
|
same store name <code class="docutils literal"><span class="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime, |
|
indicating that the state store cannot be found. If the state store is not associated with the processor |
|
in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor’s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at |
|
runtime, indicating the state store is not accessible from this processor.</p> |
|
<p>Now that you have fully defined your processor topology in your application, you can proceed to |
|
<a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p> |
|
</div> |
|
</div> |
|
|
|
|
|
</div> |
|
</div> |
|
<div class="pagination"> |
|
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api" class="pagination__btn pagination__btn__prev">Previous</a> |
|
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__next">Next</a> |
|
</div> |
|
</script> |
|
|
|
<!--#include virtual="../../../includes/_header.htm" --> |
|
<!--#include virtual="../../../includes/_top.htm" --> |
|
<div class="content documentation documentation--current"> |
|
<!--#include virtual="../../../includes/_nav.htm" --> |
|
<div class="right"> |
|
<!--#include virtual="../../../includes/_docs_banner.htm" --> |
|
<ul class="breadcrumbs"> |
|
<li><a href="/documentation">Documentation</a></li> |
|
<li><a href="/documentation/streams">Kafka Streams</a></li> |
|
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li> |
|
</ul> |
|
<div class="p-content"></div> |
|
</div> |
|
</div> |
|
<!--#include virtual="../../../includes/_footer.htm" --> |
|
<script> |
|
$(function() { |
|
// Show selected style on nav item |
|
$('.b-nav__streams').addClass('selected'); |
|
|
|
//sticky secondary nav |
|
var $navbar = $(".sub-nav-sticky"), |
|
y_pos = $navbar.offset().top, |
|
height = $navbar.height(); |
|
|
|
$(window).scroll(function() { |
|
var scrollTop = $(window).scrollTop(); |
|
|
|
if (scrollTop > y_pos - height) { |
|
$navbar.addClass("navbar-fixed") |
|
} else if (scrollTop <= y_pos) { |
|
$navbar.removeClass("navbar-fixed") |
|
} |
|
}); |
|
|
|
// Display docs subnav items |
|
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
|
}); |
|
</script>
|
|
|