Code samples are now unified and correctly formatted.
Samples under Streams use consistently the prism library.
Reviewers: Bruno Cadonna <cadonna@apache.org>
<h2>Step 1: Run the application reset tool<aclass="headerlink"href="#step-1-run-the-application-reset-tool"title="Permalink to this headline"></a></h2>
<p>Invoke the application reset tool from the command line</p>
<p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with <codeclass="docutils literal"><spanclass="pre">--dry-run</span></code> to preview your changes before making them.</p>
<li><pclass="first">Create a <codeclass="docutils literal"><spanclass="pre">java.util.Properties</span></code> instance.</p>
</li>
<li><pclass="first">Set the <aclass="reference internal"href="#streams-developer-guide-required-configs"><spanclass="std std-ref">parameters</span></a>. For example:</p>
public void configure(final Map<String, ?> configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}</code></pre>
@Override
public void configure(final Map<String, ?> configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}</code></pre>
</div></blockquote>
</div>
@ -434,32 +433,31 @@
@@ -434,32 +433,31 @@
<p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via
<codeclass="docutils literal"><spanclass="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom
<div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
configuration for RocksDB, you can implement <codeclass="docutils literal"><spanclass="pre">RocksDBConfigSetter</span></code> and provide your custom class via <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
<p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
and <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance.</p>
<p>In this example, the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<h4><aclass="toc-backref"href="#id17">Naming</a><aclass="headerlink"href="#naming"title="Permalink to this headline"></a></h4>
@ -811,18 +801,17 @@
@@ -811,18 +801,17 @@
<codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <codeclass="docutils literal"><spanclass="pre">request.timeout.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">retry.backoff.ms</span></code> control retries for client request;
<codeclass="docutils literal"><spanclass="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
You can avoid duplicate names by prefix parameter names with <codeclass="docutils literal"><spanclass="pre">consumer.</span></code>, <codeclass="docutils literal"><spanclass="pre">producer.</span></code>, or <codeclass="docutils literal"><spanclass="pre">admin.</span></code> (e.g., <codeclass="docutils literal"><spanclass="pre">consumer.send.buffer.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">producer.send.buffer.bytes</span></code>).</p>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
@ -830,25 +819,21 @@
@@ -830,25 +819,21 @@
<li><codeclass="docutils literal"><spanclass="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> to set the config.</p>
<p> Same applied to <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> and <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
<p> Additionally, to configure the internal repartition/changelog topics, you could use the <codeclass="docutils literal"><spanclass="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
<p>If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:</p>
<p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
you to customize how to handle such records. The customized implementation of the interface can be specified via the <code>StreamsConfig</code>.
@ -101,12 +98,11 @@
@@ -101,12 +98,11 @@
<h3>Primitive and basic types<aclass="headerlink"href="#primitive-and-basic-types"title="Permalink to this headline"></a></h3>
<p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <codeclass="docutils literal"><spanclass="pre">byte[]</span></code> in
its <codeclass="docutils literal"><spanclass="pre">kafka-clients</span></code> Maven artifact:</p>
<p>This artifact provides the following serde implementations under the package <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
<spanid="streams-developer-guide-interactive-queries-local-key-value-stores"></span><h3><aclass="toc-backref"href="#id4">Querying local key-value stores</a><aclass="headerlink"href="#querying-local-key-value-stores"title="Permalink to this headline"></a></h3>
<p>To query a local key-value store, you must first create a topology with a key-value store. This example creates a key-value
store named “CountsKeyValueStore”. This store will hold the latest count for any word that is found on the topic “word-count-input”.</p>
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();</code></pre>
<p>After the application has started, you can get access to “CountsKeyValueStore” and then query it via the <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java">ReadOnlyKeyValueStore</a> API:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Get the key-value store CountsKeyValueStore</span>
<spanclass="n">System</span><spanclass="o">.</span><spanclass="na">out</span><spanclass="o">.</span><spanclass="na">println</span><spanclass="o">(</span><spanclass="s">"count for hello:"</span><spanclass="o">+</span><spanclass="n">keyValueStore</span><spanclass="o">.</span><spanclass="na">get</span><spanclass="o">(</span><spanclass="s">"hello"</span><spanclass="o">));</span>
<spanclass="c1">// Get the values for a range of keys available in this application instance</span>
<spanclass="n">System</span><spanclass="o">.</span><spanclass="na">out</span><spanclass="o">.</span><spanclass="na">println</span><spanclass="o">(</span><spanclass="s">"count for "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">key</span><spanclass="o">+</span><spanclass="s">": "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">value</span><spanclass="o">);</span>
<spanclass="o">}</span>
<spanclass="c1">// Get the values for all of the keys available in this application instance</span>
<spanclass="n">System</span><spanclass="o">.</span><spanclass="na">out</span><spanclass="o">.</span><spanclass="na">println</span><spanclass="o">(</span><spanclass="s">"count for "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">key</span><spanclass="o">+</span><spanclass="s">": "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">value</span><spanclass="o">);</span>
<spanclass="o">}</span></code></pre></div>
</div>
<preclass="line-numbers"><codeclass="language-java">// Get the key-value store CountsKeyValueStore
System.out.println("count for hello:" + keyValueStore.get("hello"));
// Get the values for a range of keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.range("all", "streams");
while (range.hasNext()) {
KeyValue<String, Long> next = range.next();
System.out.println("count for " + next.key + ": " + next.value);
}
// Get the values for all of the keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
KeyValue<String, Long> next = range.next();
System.out.println("count for " + next.key + ": " + next.value);
}</code></pre>
<p>You can also materialize the results of stateless operators by using the overloaded methods that take a <codeclass="docutils literal"><spanclass="pre">queryableStoreName</span></code>
<spanid="streams-developer-guide-interactive-queries-local-window-stores"></span><h3><aclass="toc-backref"href="#id5">Querying local window stores</a><aclass="headerlink"href="#querying-local-window-stores"title="Permalink to this headline"></a></h3>
<p>A window store will potentially have many results for any given key because the key can be present in multiple windows.
However, there is only one result per window for a given key.</p>
<p>To query a local window store, you must first create a topology with a window store. This example creates a window store
named “CountsWindowStore” that contains the counts for words in 1-minute windows.</p>
<p>After the application has started, you can get access to “CountsWindowStore” and then query it via the <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java">ReadOnlyWindowStore</a> API:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Get the window store named "CountsWindowStore"</span>
<spanclass="c1">// Fetch values for the key "world" for all of the windows available in this application instance.</span>
<spanclass="c1">// To get *all* available windows we fetch windows from the beginning of time until now.</span>
<spanclass="kt">Instant</span><spanclass="n">timeFrom</span><spanclass="o">=</span><spanclass="na">Instant</span><spanclass="o">.</span><spanclass="na">ofEpochMilli<spanclass="o">(</span><spanclass="mi">0</span><spanclass="o">);</span><spanclass="c1">// beginning of time = oldest available</span>
<spanclass="kt">Instant</span><spanclass="n">timeTo</span><spanclass="o">=</span><spanclass="n">Instant</span><spanclass="o">.</span><spanclass="na">now</span><spanclass="o">();</span><spanclass="c1">// now (in processing-time)</span>
<spanclass="n">System</span><spanclass="o">.</span><spanclass="na">out</span><spanclass="o">.</span><spanclass="na">println</span><spanclass="o">(</span><spanclass="s">"Count of 'world' @ time "</span><spanclass="o">+</span><spanclass="n">windowTimestamp</span><spanclass="o">+</span><spanclass="s">" is "</span><spanclass="o">+</span><spanclass="n">next</span><spanclass="o">.</span><spanclass="na">value</span><spanclass="o">);</span>
<spanclass="o">}</span></code></pre></div>
</div>
<preclass="line-numbers"><codeclass="language-java">// Get the window store named "CountsWindowStore"
<spanid="streams-developer-guide-interactive-queries-custom-stores"></span><h3><aclass="toc-backref"href="#id6">Querying local custom state stores</a><aclass="headerlink"href="#querying-local-custom-state-stores"title="Permalink to this headline"></a></h3>
@ -233,43 +225,41 @@
@@ -233,43 +225,41 @@
<li>It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.</li>
</ul>
<p>The class/interface hierarchy for your custom store might look something like:</p>
<spanclass="c1">// implementation of the supplier for MyCustomStore</span>
<spanclass="o">}</span></code></pre></div>
</div>
<preclass="line-numbers"><codeclass="language-java">public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
// implementation of the actual store
}
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
void write(K Key, V value);
}
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
V read(K key);
}
public class MyCustomStoreBuilder implements StoreBuilder {
// implementation of the supplier for MyCustomStore
}</code></pre>
<p>To make this store queryable you must:</p>
<ulclass="simple">
<li>Provide an implementation of <aclass="reference external"href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java">QueryableStoreType</a>.</li>
<li>Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.</li>
</ul>
<p>Here is how to implement <codeclass="docutils literal"><spanclass="pre">QueryableStoreType</span></code>:</p>
<preclass="line-numbers"><codeclass="language-java">public class MyCustomStoreType<K,V>implementsQueryableStoreType<MyReadableCustomStore<K,V>>{
<spanclass="c1">// Only accept StateStores that are of type MyCustomStore</span>
<p>A wrapper class is required because each instance of a Kafka Streams application may run multiple stream tasks and manage
multiple local instances of a particular state store. The wrapper class hides this complexity and lets you query a “logical”
state store by name without having to know about all of the underlying local instances of that state store.</p>
@ -279,56 +269,53 @@
@@ -279,56 +269,53 @@
<codeclass="docutils literal"><spanclass="pre">StateStoreProvider#stores(String</span><spanclass="pre">storeName,</span><spanclass="pre">QueryableStoreType<T></span><spanclass="pre">queryableStoreType)</span></code> returns a <codeclass="docutils literal"><spanclass="pre">List</span></code> of state
stores with the given storeName and of the type as defined by <codeclass="docutils literal"><spanclass="pre">queryableStoreType</span></code>.</p>
<p>Here is an example implementation of the wrapper follows (Java 8+):</p>
<divclass="highlight-java"><divclass="highlight"><pre><code><span></span><spanclass="c1">// We strongly recommended implementing a read-only interface</span>
<spanclass="c1">// to restrict usage of the store to safe read operations!</span>
<spanid="streams-developer-guide-interactive-queries-discover-app-instances-and-stores"></span><h3><aclass="toc-backref"href="#id10">Discovering and accessing application instances and their local state stores</a><aclass="headerlink"href="#discovering-and-accessing-application-instances-and-their-local-state-stores"title="Permalink to this headline"></a></h3>
<p>The following methods return <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html">StreamsMetadata</a> objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores.</p>
<p>For example, we can now find the <codeclass="docutils literal"><spanclass="pre">StreamsMetadata</span></code> for the state store named “word-count” that we defined in the
<p>The cache size is specified through the <codeclass="docutils literal"><spanclass="pre">cache.max.bytes.buffering</span></code> parameter, which is a global setting per
processing topology:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Enable record cache of size 10 MB.</span>
<p>This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with
<codeclass="docutils literal"><spanclass="pre">T</span></code> threads and <codeclass="docutils literal"><spanclass="pre">C</span></code> bytes allocated for caching, each thread will have an even <codeclass="docutils literal"><spanclass="pre">C/T</span></code> bytes to construct its own
cache and use as it sees fit among its tasks. This means that there are as many caches as there are threads, but no sharing of
@ -103,27 +102,16 @@
@@ -103,27 +102,16 @@
<p>Here are example settings for both parameters based on desired scenarios.</p>
<ul>
<li><pclass="first">To turn off caching the cache size can be set to zero:</p>
<blockquote>
<div><divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Disable record cache</span>
<p>Turning off caching might result in high write traffic for the underlying RocksDB store.
With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled.
Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off.</p>
<p>For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB. For more information, see
the <aclass="reference internal"href="config-streams.html#streams-developer-guide-rocksdb-config"><spanclass="std std-ref">RocksDB config</span></a>.</p>
</div></blockquote>
<preclass="line-numbers"><codeclass="language-java">// Disable record cache
<li><pclass="first">To enable caching but still have an upper bound on how long records will be cached, you can set the commit interval. In this example, it is set to 1000 milliseconds:</p>
<p>The effect of these two configurations is described in the figure below. The records are shown using 4 keys: blue, red, yellow, and green. Assume the cache has space for only 3 keys.</p>
@ -156,13 +144,12 @@
@@ -156,13 +144,12 @@
<p>Following from the example first shown in section <aclass="reference internal"href="processor-api.html#streams-developer-guide-state-store"><spanclass="std std-ref">State Stores</span></a>, to disable caching, you can
add the <codeclass="docutils literal"><spanclass="pre">withCachingDisabled</span></code> call (note that caches are enabled by default, however there is an explicit <codeclass="docutils literal"><spanclass="pre">withCachingEnabled</span></code>
<p>Also, we recommend changing RocksDB's default memory allocator, because the default allocator may lead to increased memory consumption.
To change the memory allocator to <code>jemalloc</code>, you need to set the environment variable <code>LD_PRELOAD</code>before you start your Kafka Streams application:</p>
<pre>
# example: install jemalloc (on Debian)
<preclass="line-numbers"><codeclass="language-bash"># example: install jemalloc (on Debian)
$ apt install -y libjemalloc-dev
# set LD_PRELOAD before you start your Kafka Streams application
<p> As of 2.3.0 the memory usage across all instances can be bounded, limiting the total off-heap memory of your Kafka Streams application. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a shared <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager">WriteBufferManager</a> and count its memory against the block cache, and then pass the same Cache object to each instance. See <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB">RocksDB Memory Usage</a> for details. An example RocksDBConfigSetter implementing this is shown below:</p>
<preclass="line-numbers"><codeclass="language-java">public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
<spanclass="c1">// These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)</span>
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
}
}</code></pre>
<div>
<supid="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for "high priority" (aka index and filter) blocks, preventing them from being evicted by data blocks. See the full signature of the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72">LRUCache constructor</a>.
NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
<li>In the <codeclass="docutils literal"><spanclass="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
<li>In the <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
of the store through <codeclass="docutils literal"><spanclass="pre">enableLogging()</span></code> and <codeclass="docutils literal"><spanclass="pre">disableLogging()</span></code>.
You can also fine-tune the associated topic’s configuration if needed.</p>
<spanclass="o">.</span><spanclass="na">withLoggingDisabled</span><spanclass="o">();</span><spanclass="c1">// disable backing up the store to a changelog topic</span></code></pre></div>
.withLoggingDisabled(); // disable backing up the store to a changelog topic</code></pre>
<divclass="admonition attention">
<pclass="first admonition-title">Attention</p>
<pclass="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any <aclass="reference internal"href="config-streams.html#streams-developer-guide-standby-replicas"><spanclass="std std-ref">standby replicas</span></a>.</p>
@ -333,19 +329,18 @@
@@ -333,19 +329,18 @@
<p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration:
You can add any log config from <aclass="reference external"href="https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogConfig.scala">kafka.log.LogConfig</a>.
.withLoggingEnabled(changlogConfig); // enable changelogging, with custom changelog settings</code></pre>
</div>
<divclass="section"id="timestamped-state-stores">
<spanid="streams-developer-guide-state-store-timestamps"></span><h3><aclass="toc-backref"href="#id11">Timestamped State Stores</a><aclass="headerlink"href="#timestamped-state-stores"title="Permalink to this headline"></a></h3>
@ -389,12 +384,11 @@
@@ -389,12 +384,11 @@
<codeclass="docutils literal"><spanclass="pre">partition</span></code>, <codeclass="docutils literal"><spanclass="pre">offset</span></code>, <codeclass="docutils literal"><spanclass="pre">timestamp</span></code> and
<h2><aclass="toc-backref"href="#id8">Connecting Processors and State Stores</a><aclass="headerlink"href="#connecting-processors-and-state-stores"title="Permalink to this headline"></a></h2>
<p>Now that a <aclass="reference internal"href="#streams-developer-guide-stream-processor"><spanclass="std std-ref">processor</span></a> (WordCountProcessor) and the
@ -403,16 +397,16 @@
@@ -403,16 +397,16 @@
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
output data streams out of the topology.</p>
<p>Here is an example implementation:</p>
<preclass="line-numbers"><codeclass="language-java">Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// add the count store associated with the WordCountProcessor processor
.addStateStore(countStoreBuilder, "Process")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
<p>Here is a quick explanation of this example:</p>
<ulclass="simple">
<li>A source processor node named <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> is added to the topology using the <codeclass="docutils literal"><spanclass="pre">addSource</span></code> method, with one Kafka topic
@ -429,22 +423,22 @@
@@ -429,22 +423,22 @@
This can be done by implementing <codeclass="docutils literal"><spanclass="pre">ConnectedStoreProvider#stores()</span></code> on the <codeclass="docutils literal"><spanclass="pre">ProcessorSupplier</span></code>
instead of calling <codeclass="docutils literal"><spanclass="pre">Topology#addStateStore()</span></code>, like this:
</p>
<preclass="line-numbers"><codeclass="language-java">Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor.
// the ProcessorSupplier provides the count store associated with the WordCountProcessor
.addProcessor("Process", new ProcessorSupplier<String, String>() {
public Processor<String, String> get() {
return new WordCountProcessor();
}
public Set<StoreBuilder<?>> stores() {
return countStoreBuilder;
}
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
<p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology.
Multiple processors that share a state store may provide the same store with this technique, as long as the <codeclass="docutils literal"><spanclass="pre">StoreBuilder</span></code> is the same <codeclass="docutils literal"><spanclass="pre">instance</span></code>.</p>
<p>In these topologies, the <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> node, and an
<spanid="streams-developer-guide-execution-starting"></span><h2><aclass="toc-backref"href="#id3">Starting a Kafka Streams application</a><aclass="headerlink"href="#starting-a-kafka-streams-application"title="Permalink to this headline"></a></h2>
<p>You can package your Java application as a fat JAR file and then start the application like this:</p>
<divclass="highlight-bash"><divclass="highlight"><pre><code><span></span><spanclass="c1"># Start the application in class `com.example.MyStreamsApp`</span>
<spanclass="c1"># from the fat JAR named `path-to-app-fatjar.jar`.</span>
<p>When you start your application you are launching a Kafka Streams instance of your application. You can run multiple
instances of your application. A common scenario is that there are multiple instances of your application running in
parallel. For more information, see <aclass="reference internal"href="../architecture.html#streams_architecture_tasks"><spanclass="std std-ref">Parallelism Model</span></a>.</p>
<p>Configure these settings in the application for your <codeclass="docutils literal"><spanclass="pre">Properties</span></code> instance. These settings will encrypt any
data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the
Kafka brokers that it is communicating with. Note that this example does not cover client authorization.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Code of your Java application that uses the Kafka Streams library</span>
<p>If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you
start it. For example, if you enter an incorrect password for the <codeclass="docutils literal"><spanclass="pre">ssl.keystore.password</span></code> setting, an error message
similar to this would be logged and then the application would terminate:</p>
<pclass="last">See the section <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a> for more information about Serializers/Deserializers.</p>
</div>
<p>Example <codeclass="docutils literal"><spanclass="pre">pom.xml</span></code> snippet when using Maven:</p>
<h2>Using Kafka Streams within your application code<aclass="headerlink"href="#using-kafka-streams-within-your-application-code"title="Permalink to this headline"></a></h2>
@ -120,79 +120,69 @@
@@ -120,79 +120,69 @@
<li>The second argument is an instance of <codeclass="docutils literal"><spanclass="pre">java.util.Properties</span></code>, which defines the configuration for this specific topology.</li>
<spanclass="c1">// Use the builders to define the actual processing topology, e.g. to specify</span>
<spanclass="c1">// from which input topics to read, which stream operations (filter, map, etc.)</span>
<spanclass="c1">// should be called, and so on. We will cover this in detail in the subsequent</span>
<spanclass="c1">// sections of this Developer Guide.</span>
<spanclass="n">StreamsBuilder</span><spanclass="n">builder</span><spanclass="o">=</span><spanclass="o">...;</span><spanclass="c1">// when using the DSL</span>
<spanclass="n">Topology</span><spanclass="n">topology</span><spanclass="o">=</span><spanclass="o">...;</span><spanclass="c1">// when using the Processor API</span>
<spanclass="c1">// Use the configuration to tell your application where the Kafka cluster is,</span>
<spanclass="c1">// which Serializers/Deserializers to use by default, to specify security settings,</span>
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on. We will cover this in detail in the subsequent
// sections of this Developer Guide.
StreamsBuilder builder = ...; // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;
KafkaStreams streams = new KafkaStreams(topology, props);</code></pre>
<p>At this point, internal structures are initialized, but the processing is not started yet.
You have to explicitly start the Kafka Streams thread by calling the <codeclass="docutils literal"><spanclass="pre">KafkaStreams#start()</span></code> method:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Start the Kafka Streams threads</span>
<preclass="line-numbers"><codeclass="language-java">// Start the Kafka Streams threads
streams.start();</code></pre>
<p>If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka
Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
For more information, see <aclass="reference internal"href="../architecture.html#streams_architecture_tasks"><spanclass="std std-ref">Stream Partitions and Tasks</span></a> and <aclass="reference internal"href="../architecture.html#streams_architecture_threads"><spanclass="std std-ref">Threading Model</span></a>.</p>
<p>To catch any unexpected exceptions, you can set an <codeclass="docutils literal"><spanclass="pre">java.lang.Thread.UncaughtExceptionHandler</span></code> before you start the
application. This handler is called whenever a stream thread is terminated by an unexpected exception:</p>
<divclass="highlight-java"><divclass="highlight"><pre><code><span></span><spanclass="c1">// Java 8+, using lambda expressions</span>
You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
Assuming the above parameter values are used, this command will create a project structure that looks like this:
</p>
<preclass="line-numbers"><codeclass="language-bash">> tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
|-- main
|-- java
| |-- myapps
| |-- LineSplit.java
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties</code></pre>
<preclass="line-numbers"><codeclass="language-bash">> tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
|-- main
|-- java
| |-- myapps
| |-- LineSplit.java
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties</code></pre>
<p>
The <code>pom.xml</code> file included in the project already has the Streams dependency defined.
@ -79,22 +78,22 @@
@@ -79,22 +78,22 @@
Since we are going to start writing such programs from scratch, we can now delete these examples:
</p>
<preclass="line-numbers"><codeclass="language-bash">> cd streams-quickstart
> rm src/main/java/myapps/*.java</code></pre>
<preclass="line-numbers"><codeclass="language-bash">> cd streams-quickstart
> rm src/main/java/myapps/*.java</code></pre>
<h4><aid="tutorial_code_pipe"href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java/myapps</code>.
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws Exception {
}
}</code></pre>
}
}</code></pre>
<p>
We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
@ -107,16 +106,16 @@
@@ -107,16 +106,16 @@
and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
</p>
<preclass="line-numbers"><codeclass="language-java">Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092</code></pre>
<preclass="line-numbers"><codeclass="language-java">Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092</code></pre>
<p>
In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
As shown above, it illustrates that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
@ -189,7 +188,7 @@
@@ -189,7 +188,7 @@
we can now construct the Streams client with the two components we have just constructed above: the configuration map specified in a <code>java.util.Properties</code> instance and the <code>Topology</code> object.
</p>
<preclass="line-numbers"><codeclass="language-java">final KafkaStreams streams = new KafkaStreams(topology, props);</code></pre>
<preclass="line-numbers"><codeclass="language-java">final KafkaStreams streams = new KafkaStreams(topology, props);</code></pre>
<p>
By calling its <code>start()</code> function we can trigger the execution of this client.
@ -197,76 +196,76 @@
@@ -197,76 +196,76 @@
We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
</p>
<preclass="line-numbers"><codeclass="language-java">final CountDownLatch latch = new CountDownLatch(1);
<preclass="line-numbers"><codeclass="language-java">final CountDownLatch latch = new CountDownLatch(1);
As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
@ -365,41 +364,41 @@
@@ -365,41 +364,41 @@
The complete code looks like this (assuming lambda expression is used):
In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys: