Mirror of Apache Kafka
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2982 lines
175 KiB

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<h1>Developer Manual</h1>
<ul class="toc">
<li><a href="#streams_processor">1. Low-level Processor API</a>
<ul>
<li><a href="#streams_processor_process">1.1 Processor</a>
<li><a href="#streams_processor_topology">1.2 Processor Topology</a>
<li><a href="#streams_processor_statestore">1.3 State Stores</a>
<li><a href="#restoration_progress">1.4 Monitoring the Restoration Progress of Fault-tolerant State Store</a>
<li><a href="#disable-changelogs">1.5 Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a>
<li><a href="#implementing-custom-state-stores">1.6 Implementing Custom State Stores</a>
<li><a href="#connecting-processors-and-state-stores">1.7 Connecting Processors and State Stores</a>
<li><a href="#streams_processor_describe">1.5 Describe a Topology</a>
</ul>
</li>
<li><a href="#streams_dsl">2. High-Level Streams DSL</a>
<ul>
<li><a href="#streams_duality">2.1 Duality of Streams and Tables</a>
<li><a href="#streams_dsl_source">2.2 Creating Source Streams from Kafka</a>
<li><a href="#streams_dsl_transform">2.3 Transform a stream</a>
<li><a href="#streams_dsl_sink">2.4 Write streams back to Kafka</a>
<li><a href="#streams_dsl_build">2.5 Generate the processor topology</a>
</ul>
</li>
<li><a href="#streams_interactive_queries">3. Interactive Queries</a>
<ul>
<li><a href="#streams_developer-guide_interactive-queries_your_app">3.1 Your application and interactive queries</a>
<li><a href="#streams_developer-guide_interactive-queries_local-stores">3.2 Querying local state stores (for an application instance)</a>
<li><a href="#streams_developer-guide_interactive-queries_local-key-value-stores">3.3 Querying local key-value stores</a>
<li><a href="#streams_developer-guide_interactive-queries_local-window-stores">3.4 Querying local window stores</a>
<li><a href="#streams_developer-guide_interactive-queries_custom-stores">3.5 Querying local custom state stores</a>
<li><a href="#streams_developer-guide_interactive-queries_discovery">3.6 Querying remote state stores (for the entire application)</a>
<li><a href="#streams_developer-guide_interactive-queries_rpc-layer">3.7 Adding an RPC layer to your application</a>
<li><a href="#streams_developer-guide_interactive-queries_expose-rpc">3.8 Exposing the RPC endpoints of your application</a>
<li><a href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">3.9 Discovering and accessing application instances and their respective local state stores</a>
</ul>
</li>
<li><a href="#streams_developer-guide_memory-management">4. Memory Management</a>
<ul>
<li><a href="#streams_developer-guide_memory-management_record-cache">4.1 Record caches in the DSL</a>
<li><a href="#streams_developer-guide_memory-management_state-store-cache">4.2 State store caches in the Processor API</a>
<li><a href="#streams_developer-guide_memory-management_other_memory_usage">4.3 Other memory usage</a>
</ul>
</li>
<li><a href="#streams_configure_execute">5. Application Configuration and Execution</a>
<ul>
<li><a href="#streams_client_config">5.1 Producer and Consumer Configuration</a>
<li><a href="#streams_broker_config">5.2 Broker Configuration</a>
<li><a href="#streams_topic_config">5.3 Internal Topic Configuration</a>
<li><a href="#streams_execute">5.4 Executing Your Kafka Streams Application</a>
</ul>
</li>
</ul>
<p>
There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
This section focuses on how to write, configure, and execute a Kafka Streams application.
</p>
<p>
As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="/{{version}}/documentation/streams/core-concepts#streams_topology">processor topology</a>.
Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
</p>
<h3><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h3>
<h4><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h4>
<p>
As mentioned in the <a href="/{{version}}/documentation/streams/core-concepts"><b>Core Concepts</b></a> section, a stream processor is a node in the processor topology that represents a single processing step.
With the <code>Processor</code> API developers can define arbitrary stream processors that process one received record at a time, and connect these processors with
their associated state stores to compose the processor topology that represents their customized processing logic.
</p>
<p>
The <code>Processor</code> interface provides one main API method, the <code>process</code> method,
which is performed on each of the received records.
In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the <code>init</code> method
and use the context to schedule a periodically called punctuation function (<code>context().schedule</code>),
to forward the modified / new key-value pair to downstream processors (<code>context().forward</code>),
to commit the current processing progress (<code>context().commit</code>), etc.
</p>
<p>
The following example <code>Processor</code> implementation defines a simple word-count algorithm:
</p>
<pre class="brush: java;">
public class MyProcessor implements Processor&lt;String, String&gt; {
private ProcessorContext context;
private KeyValueStore&lt;String, Long&gt; kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// schedule a punctuation method every 1000 milliseconds.
this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue&lt;String, Long&gt; entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
// it is the caller's responsibility to close the iterator on state store;
// otherwise it may lead to memory and file handlers leak depending on the
// underlying state store implementation.
iter.close();
// commit the current processing progress
context.commit();
}
});
// retrieve the key-value store named "Counts"
this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Long oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1L);
} else {
this.kvStore.put(word, oldValue + 1L);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue&lt;String, Long&gt; entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close(); // avoid OOM
// commit the current processing progress
context.commit();
}
@Override
public void close() {
// close any resources managed by this processor.
// Note: Do not close any StateStores as these are managed
// by the library
}
};
</pre>
<p>
In the above implementation, the following actions are performed:
</p>
<ul>
<li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li>
<li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li>
<li>In the scheduled <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
<li>When done with the <code>KeyValueIterator&lt;String, Long&gt;</code> you <em>must</em> close the iterator, as shown above or use the try-with-resources statement.</li>
</ul>
<h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
<p>
With the customized processors defined in the Processor API, developers can use <code>Topology</code> to build a processor topology
by connecting these processors together:
</p>
<pre class="brush: java;">
Topology topology = new Topology();
topology.addSource("SOURCE", "src-topic")
// add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
.addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
// add "PROCESS2" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
// add "PROCESS3" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
// add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
// as output and the "PROCESS1" node as its upstream processor
.addSink("SINK1", "sink-topic1", "PROCESS1")
// add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
// as output and the "PROCESS2" node as its upstream processor
.addSink("SINK2", "sink-topic2", "PROCESS2")
// add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
// as output and the "PROCESS3" node as its upstream processor
.addSink("SINK3", "sink-topic3", "PROCESS3");
</pre>
There are several steps in the above code to build the topology, and here is a quick walk through:
<ul>
<li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
<li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li>
<li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
</ul>
<h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
<p>
In order to make state stores fault-tolerant (e.g., to recover from machine crashes) as well as to allow for state store migration without data loss (e.g., to migrate a stateful stream task from one machine to another when elastically adding or removing capacity from your application), a state store can be <strong>continuously backed up</strong> to a Kafka topic behind the scenes.
We sometimes refer to this topic as the state store's associated <em>changelog topic</em> or simply its <em>changelog</em>.
In the case of a machine failure, for example, the state store and thus the application's state can be fully restored from its changelog.
You can enable or disable this backup feature for a state store, and thus its fault tolerance.
</p>
<p>
By default, persistent <strong>key-value stores</strong> are fault-tolerant.
They are backed by a <a href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic.
The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.
</p>
<p>
Similarly, persistent <strong>window stores</strong> are fault-tolerant.
They are backed by a topic that uses both <em>compaction</em> and <em>deletion</em>.
Using deletion in addition to compaction is required for the changelog topics of window stores because of the structure of the message keys that are being sent to the changelog topics: for window stores, the message keys are composite keys that include not only the &quot;normal&quot; key but also window timestamps.
For such composite keys it would not be sufficient to enable just compaction in order to prevent a changelog topic from growing out of bounds.
With deletion enabled, old windows that have expired will be cleaned up by Kafka's log cleaner as the log segments expire.
The default retention setting is <code>Windows#maintainMs()</code> + 1 day. This setting can be overriden by specifying <code>StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</code> in the <code>StreamsConfig</code>.
</p>
<p>
One additional note regarding the use of state stores. Any time you open an <code>Iterator</code> from a state store you <em>must</em> call <code>close()</code> on the iterator
when you are done working with it to reclaim resources. Or you can use the iterator from within a try-with-resources statement.
By not closing an iterator, you may likely encounter an OOM error.
</p>
<h4><a id="restoration_progress" href="#restoration_progress">Monitoring the Restoration Progress of Fault-tolerant State Stores</a></h4>
<p>
When starting up your application any fault-tolerant state stores don't need a restoration process as the persisted state is read from local disk.
But there could be situations when a full restore from the backing changelog topic is required (e.g., a failure wiped out the local state or your application runs in a stateless environment and persisted data is lost on re-starts).
</p>
<p>
If you have a significant amount of data in the changelog topic, the restoration process could take a non-negligible amount of time.
Given that processing of new data won't start until the restoration process is completed, having a window into the progress of restoration is useful.
</p>
<p>
In order to observe the restoration of all state stores you provide your application an instance of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface.
You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> by calling the <code>KafkaStreams#setGlobalStateRestoreListener</code> method.
</p>
<p>
A basic implementation example that prints restoration status to the console:
</p>
<pre class="brush: java;">
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
@Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
final long endingOffset) {
System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
System.out.println(" total records to be restored " + (endingOffset - startingOffset));
}
@Override
public void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored) {
System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName,
final long totalRestored) {
System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
}
}
</pre>
<blockquote>
<p>
The <code>StateRestoreListener</code> instance is shared across all <code>org.apache.kafka.streams.processor.internals.StreamThread</code> instances and it is assumed all methods are stateless.
If any stateful operations are desired, then the user will need to provide synchronization internally.
</p>
</blockquote>
<h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
<p>
You can enable or disable fault tolerance for a state store by enabling or disabling, respectively ,the changelogging of the store through <code>StateStoreBuilder#withLoggingEnabled(Map&lt;String, String&gt;)</code>
and <code>StateStoreBuilder#withLoggingDisabled()</code>.
You can also fine-tune the associated topic’s configuration if needed.
</p>
<p>Example for disabling fault-tolerance:</p>
<pre class="brush: java;">
import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.processor.state.StateStoreBuilder;
import org.apache.kafka.streams.state.Stores;
KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
Serdes.String(),
Serdes.Long())
.withLoggingDisabled(); // disable backing up the store to a changelog topic
</pre>
<blockquote>
<p>If the changelog is disabled then the attached state store is no longer fault tolerant and it can't have any standby replicas</p>
</blockquote>
<p>
Example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config
from kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61. Unrecognized configs will be ignored.
</p>
<pre class="brush: java;">
import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.processor.state.StateStoreBuilder;
import org.apache.kafka.streams.state.Stores;
Map&lt;String, String&gt; changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put("min.insyc.replicas", "1")
KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
Serdes.String(),
Serdes.Long())
.withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings
</pre>
<h4><a id="implementing-custom-state-stores" href="#implementing-custom-state-stores">Implementing custom State Stores</a></h4>
<p>
Apart from using the built-in state store types, you can also implement your own.
The primary interface to implement for the store is <code>org.apache.kafka.streams.processor.StateStore</code>.
Beyond that, Kafka Streams also has a few extended interfaces such as <code>KeyValueStore</code>.
</p>
<p>
In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
</p>
<p>
You also have the option of providing a <code>org.apache.kafka.streams.processor.StateRestoreCallback</code> instance used to restore the state store from its backing changelog topic.
This is done via the <code>org.apache.kafka.streams.processor.ProcessorContext#register</code> call inside the <code>StateStore#init</code> all.
</p>
<pre class="brush: java;">
public void init(ProcessorContext context, StateStore store) {
context.register(store, false, stateRestoreCallBackIntance);
}
</pre>
<p>
There is an additional interface <code>org.apache.kafka.streams.processor.BatchingStateRestoreCallback</code> that provides bulk restoration semantics vs. the single record-at-a-time restoration semantics offered by the <code>StateRestoreCallback</code> interface.
</p>
<p>
Addtionally there are two abstract classes that implement <code>StateRestoreCallback</code> or <code>BatchingStateRestoreCallback</code> in conjuntion with the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface (<code>org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback</code> and <code>org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback</code> respectively) that provide the ability for the state store to recieve notification of restoration progress for that store.
The <code>StateRestoreListener</code> in this case is per state store instance and is used for internal purposes such as updating config settings based on the status of the restoration process.
</p>
<h4><a id="connecting-processors-and-state-stores" href="#connecting-processors-and-state-stores">Connecting Processors and State Stores</a></h4>
<p>
Now that we have defined a processor (WordCountProcessor) and the state stores, we can now construct the processor topology by connecting these processors and state stores together by using the <code>Topology</code> instance.
In addition, users can add <em>source processors</em> with the specified Kafka topics to generate input data streams into the topology, and <em>sink processors</em> with the specified Kafka topics to generate output data streams out of the topology.
</p>
<pre class="brush: java;">
Topology topology = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
topology.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// add the count store associated with the WordCountProcessor processor
.addStateStore(countStoreSupplier, "Process")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");
</pre>
<p>There are several steps in the above implementation to build the topology, and here is a quick walk-through:</p>
<ul>
<li>A source processor node named &quot;Source&quot; is added to the topology using the <code>addSource</code> method, with one Kafka topic &quot;source-topic&quot; fed to it.</li>
<li>A processor node named &quot;Process&quot; with the pre-defined <code>WordCountProcessor</code> logic is then added as the downstream processor of the &quot;Source&quot; node using the <code>addProcessor</code> method.</li>
<li>A predefined persistent key-value state store is created and associated with the &quot;Process&quot; node, using <code>countStoreSupplier</code>.</li>
<li>A sink processor node is then added to complete the topology using the <code>addSink</code> method, taking the &quot;Process&quot; node as its upstream processor and writing to a separate &quot;sink-topic&quot; Kafka topic.</li>
</ul>
<p>
In this topology, the &quot;Process&quot; stream processor node is considered a downstream processor of the &quot;Source&quot; node, and an upstream processor of the &quot;Sink&quot; node.
As a result, whenever the &quot;Source&quot; node forward a newly fetched record from Kafka to its downstream &quot;Process&quot; node, <code>WordCountProcessor#process()</code> method is triggered to process the record and update the associated state store; and whenever <code>context#forward()</code> is called in the <code>Punctuator#punctuate()</code> method, the aggregate key-value pair will be sent via the &quot;Sink&quot; processor node to the Kafka topic &quot;sink-topic&quot;.
Note that in the <code>WordCountProcessor</code> implementation, users need to refer to the same store name &quot;Counts&quot; when accessing the key-value store; otherwise an exception will be thrown at runtime, indicating that the state store cannot be found; also, if the state store itself is not associated with the processor in the <code>Topology</code> code, accessing it in the processor's <code>init()</code> method will also throw an exception at runtime, indicating the state store is not accessible from this processor.
</p>
<h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
<p>
After a <code>Topology</code> is specified it is possible to retrieve a description of the corresponding DAG via <code>#describe()</code> that returns a <code>TopologyDescription</code>.
A <code>TopologyDescription</code> contains all added source, processor, and sink nodes as well as all attached stores.
For source and sink nodes one can access the specified input/output topic name/pattern.
For processor nodes the attached stores are added to the description.
Additionally, all nodes have a list to all their connected successor and predecessor nodes.
Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
<br />
Note that global stores are listed explicitly as they are accessible by all nodes without the need to explicitly connect them.
Furthermore, nodes are grouped by <code>Sub-topologies</code>, where each sub-topology is a group of processor nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
During execution, each <code>Sub-topology</code> will be processed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
Thus, each <code>Sub-topology</code> describes an independent unit of works that can be executed by different threads in parallel.
<br />
Describing a <code>Topology</code> before starting your streams application with the specified topology is helpful to reason about tasks and thus maximum parallelism (we will talk about how to execute your written application later in this section).
It is also helpful to get insight into a <code>Topology</code> if it is not specified directly as described above but via Kafka Streams DSL (we will describe the DSL in the next section.
</p>
In the next section we present another way to build the processor topology: the Kafka Streams DSL.
<br>
<h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
To build a <code>Topology</code> using the Streams DSL, developers can apply the <code>StreamsBuilder</code> class.
A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
codes for details.
<h4><a id="streams_duality" href="#streams_duality">Duality of Streams and Tables</a></h4>
<p>
Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams:
the so-called <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/">stream-table duality</a>.
Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka's log compaction feature, for example, exploits this duality.
</p>
<p>
A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
</p>
<img class="centered" src="/{{version}}/images/streams-table-duality-01.png">
The <b>stream-table duality</b> describes the close relationship between streams and tables.
<ul>
<li><b>Stream as Table</b>: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).</li>
<li><b>Table as Stream</b>: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table.</li>
</ul>
<p>
Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column).
</p>
<img class="centered" src="/{{version}}/images/streams-table-duality-02.png" style="width:300px">
<p>
Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
</p>
<img class="centered" src="/{{version}}/images/streams-table-duality-03.png" style="width:600px">
<p>
The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance.
The stream-table duality is such an important concept that Kafka Streams models it explicitly via the <a href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a> interfaces, which we describe in the next sections.
</p>
<h5><a id="streams_kstream_ktable" href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a></h5>
The DSL uses three main abstractions. A <b>KStream</b> is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set.
A <b>KTable</b> is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is considered to be an update of the last value for the same record key,
if any (if a corresponding key doesn't exist yet, the update will be considered a create).
Like a <b>KTable</b>, a <b>GlobalKTable</b> is an abstraction of a changelog stream, where each data record represents an update.
However, a <b>GlobalKTable</b> is different from a <b>KTable</b> in that it is fully replicated on each KafkaStreams instance.
<b>GlobalKTable</b> also provides the ability to look up current values of data records by keys.
This table-lookup functionality is available through <a href="#streams_dsl_joins">join operations</a>.
To illustrate the difference between KStreams and KTables/GlobalKTables, let's imagine the following two data records are being sent to the stream:
<pre>
("alice", 1) --> ("alice", 3)
</pre>
If the stream is defined as a KStream and the stream processing application were to sum the values it would return <code>4</code>. If the stream is defined as a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
<h4><a id="streams_dsl_source" href="#streams_dsl_source">Creating Source Streams from Kafka</a></h4>
<p>
You can easily read data from Kafka topics into your application. We support the following operations.
</p>
<table class="data-table" border="1">
<tbody><tr>
<th>Reading from Kafka</th>
<th>Description</th>
</tr>
<tr>
<td><b>Stream</b>: input topic(s) &rarr; <code>KStream</code></td>
<td>Create a <code>KStream</code> from the specified Kafka input topic(s), interpreting the data as a record stream.
A <code>KStream</code> represents a partitioned record stream.
<p>
Slightly simplified, in the case of a KStream, the local KStream instance of every application instance will be populated
with data from only a <b>subset</b> of the partitions of the input topic. Collectively, i.e. across all application instances,
all the partitions of the input topic will be read and processed.
</p>
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, Long&gt; wordCounts = builder.stream(
"word-counts-input-topic" /* input topic */,
Consumed.with(Serdes.String(), Serdes.Long()); // define key and value serdes
</pre>
When to provide serdes explicitly:
<ul>
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic(s) do not match
the configured default serdes. </li>
</ul>
Several variants of <code>stream</code> exist to e.g. specify a regex pattern for input topics to read from.</td>
</tr>
<tr>
<td><b>Table</b>: input topic(s) &rarr; <code>KTable</code></td>
<td>
Reads the specified Kafka input topic into a <code>KTable</code>. The topic is interpreted as a changelog stream,
where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or
as DELETE (when the value is null) for that key.
<p>
Slightly simplified, in the case of a KTable, the local KTable instance of every application instance will be populated
with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, all
the partitions of the input topic will be read and processed.
</p>
<p>
You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
When a name is not provided the table will not queryable and an internal name will be provided for the state store.
</p>
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
StreamsBuilder builder = new StreamsBuilder();
KTable&lt;String, Long&gt; wordCounts = builder.table(
Serdes.String(), /* key serde */
Serdes.Long(), /* value serde */
"word-counts-input-topic", /* input topic */
"word-counts-partitioned-store" /* table/store name */);
</pre>
When to provide serdes explicitly:
<ul>
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
match the configured default serdes.</li>
</ul>
Several variants of <code>table</code> exist to e.g. specify the <code>auto.offset.reset</code>
policy to be used when reading from the input topic.
</td>
<tr>
<td><b>Global Table</b>: input topic &rarr; <code>GlobalKTable</code></td>
<td>
Reads the specified Kafka input topic into a <code>GlobalKTable</code>. The topic is interpreted as a changelog stream, where records
with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or as DELETE (when the
value is <code>null</code>) for that key.
<p>
Slightly simplified, in the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be
populated with data from all the partitions of the input topic. In other words, when using a global table, every application
instance will get its own, full copy of the topic's data.
</p>
<p>
You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
When a name is not provided the table will not queryable and an internal name will be provided for the state store.
</p>
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
Serdes.String(), /* key serde */
Serdes.Long(), /* value serde */
"word-counts-input-topic", /* input topic */
"word-counts-global-store" /* table/store name */);
</pre>
When to provide serdes explicitly:
<ul>
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
match the configured default serdes.</li>
</ul>
Several variants of <code>globalTable</code> exist to e.g. specify explicit serdes.
</td>
</tbody>
</table>
<h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h4>
<p>
<code>KStream</code> and <code>KTable</code> support a variety of transformation operations. Each of these operations
can be translated into one or more connected processors into the underlying processor topology. Since <code>KStream</code>
and <code>KTable</code> are strongly typed, all these transformation operations are defined as generic functions where
users could specify the input and output data types.
</p>
<p>
Some <code>KStream</code> transformations may generate one or more <code>KStream</code> objects (e.g., filter and
map on <code>KStream</code> generate another <code>KStream</code>, while branch on <code>KStream</code> can generate
multiple <code>KStream</code> instances) while some others may generate a <code>KTable</code> object (e.g., aggregation) interpreted
as the changelog stream to the resulted relation. This allows Kafka Streams to continuously update the computed value upon arrival
of late records after it has already been produced to the downstream transformation operators. As for <code>KTable</code>,
all its transformation operations can only generate another <code>KTable</code> (though the Kafka Streams DSL does
provide a special function to convert a <code>KTable</code> representation into a <code>KStream</code>, which we will
describe later). Nevertheless, all these transformation methods can be chained together to compose a complex processor topology.
</p>
<p>
We describe these transformation operations in the following subsections, categorizing them into two categories:
stateless and stateful transformations.
</p>
<h5><a id="streams_dsl_transformations_stateless" href="#streams_dsl_transformations_stateless">Stateless transformations</a></h5>
<p>
Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not
require a state store associated with the stream processor.
</p>
<table class="data-table" border="1">
<tbody><tr>
<th>Transformation</th>
<th>Description</th>
</tr>
<tr>
<td><b>Branch</b>: <code>KStream &rarr; KStream</code></td>
<td>
<p>
Branch (or split) a <code>KStream</code> based on the supplied predicates into one or more <code>KStream</code> instances.
</p>
<p>
Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches,
the record is dropped.
</p>
<p>
Branching is useful, for example, to route records to different downstream topics.
</p>
<pre class="brush: java;">
KStream&lt;String, Long&gt; stream = ...;
KStream&lt;String, Long&gt;[] branches = stream.branch(
(key, value) -> key.startsWith("A"), /* first predicate */
(key, value) -> key.startsWith("B"), /* second predicate */
(key, value) -> true /* third predicate */
);
// KStream branches[0] contains all records whose keys start with "A"
// KStream branches[1] contains all records whose keys start with "B"
// KStream branches[2] contains all other records
// Java 7 example: cf. `filter` for how to create `Predicate` instances
</pre>
</td>
</tr>
<tr>
<td><b>Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
<td>
<p>
Evaluates a boolean function for each element and retains those for which the function returns true.
</p>
<pre class="brush: java;">
KStream&lt;String, Long&gt; stream = ...;
KTable&lt;String, Long&gt; table = ...;
// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream&lt;String, Long&gt; onlyPositives = stream.filter((key, value) -> value > 0);
// Java 7 example
KStream&lt;String, Long&gt; onlyPositives = stream.filter(
new Predicate&lt;String, Long&gt;() {
@Override
public boolean test(String key, Long value) {
return value > 0;
}
});
// A filter on a KTable that materializes the result into a StateStore
table.filter((key, value) -> value != 0, Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("filtered"));
</pre>
</td>
</tr>
<tr>
<td><b>Inverse Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
<td>
<p>
Evaluates a boolean function for each element and drops those for which the function returns true.
</p>
<pre class="brush: java;">
KStream&lt;String, Long&gt; stream = ...;
// An inverse filter that discards any negative numbers or zero
// Java 8+ example, using lambda expressions
KStream&lt;String, Long&gt; onlyPositives = stream.filterNot((key, value) -> value <= 0);
// Java 7 example
KStream&lt;String, Long&gt; onlyPositives = stream.filterNot(
new Predicate&lt;String, Long&gt;() {
@Override
public boolean test(String key, Long value) {
return value <= 0;
}
});
</pre>
</td>
</tr>
<tr>
<td><b>FlatMap</b>: <code>KStream &rarr; KStream </code></td>
<td>
<p>
Takes one record and produces zero, one, or more records. You can modify the record keys and values,
including their types.
</p>
<p>
Marks the stream for data re-partitioning: Applying a grouping or a join after <code>flatMap</code> will result in
re-partitioning of the records. If possible use <code>flatMapValues</code> instead, which will not cause data re-partitioning.
</p>
<pre class="brush: java;">
KStream&lt;Long, String> stream = ...;
KStream&lt;String, Integer&gt; transformed = stream.flatMap(
// Here, we generate two output records for each input record.
// We also change the key and value types.
// Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
(key, value) -> {
List&lt;KeyValue&lt;String, Integer&gt;&gt; result = new LinkedList&lt;&gt;();
result.add(KeyValue.pair(value.toUpperCase(), 1000));
result.add(KeyValue.pair(value.toLowerCase(), 9000));
return result;
}
);
// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
</pre>
</td>
</tr>
<tr>
<td><b>FlatMap (values only)</b>: <code>KStream &rarr; KStream </code></td>
<td>
<p>
Takes one record and produces zero, one, or more records, while retaining the key of the original record.
You can modify the record values and the value type.
</p>
<p>
<code>flatMapValues</code> is preferable to <code>flatMap</code> because it will not cause data re-partitioning. However,
it does not allow you to modify the key or key type like <code>flatMap</code> does.
</p>
<pre class="brush: java;">
// Split a sentence into words.
KStream&lt;byte[], String&gt; sentences = ...;
KStream&lt;byte[], String&gt; words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
</pre>
</td>
</tr>
<tr>
<td><b>Foreach</b>: <code>KStream &rarr; void </code></td>
<td>
<p>
Terminal operation. Performs a stateless action on each record.
</p>
<p>
Note on processing guarantees: Any side effects of an action (such as writing to external systems)
are not trackable by Kafka, which means they will typically not benefit from Kafka's processing guarantees.
</p>
<pre class="brush: java;">
KStream&lt;String, Long&gt; stream = ...;
// Print the contents of the KStream to the local console.
// Java 8+ example, using lambda expressions
stream.foreach((key, value) -> System.out.println(key + " => " + value));
// Java 7 example
stream.foreach(
new ForeachAction&lt;String, Long&gt;() {
@Override
public void apply(String key, Long value) {
System.out.println(key + " => " + value);
}
});
</pre>
</td>
</tr>
<tr>
<td><b>GroupByKey</b>: <code>KStream &rarr; KGroupedStream </code></td>
<td>
<p>
Groups the records by the existing key.
</p>
<p>
Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly
partitioned ("keyed") for subsequent operations.
</p>
<p>
<b>When to set explicit serdes</b>: Variants of <code>groupByKey</code> exist to override the configured default serdes of
your application, which you must do if the key and/or value types of the resulting <code>KGroupedStream</code> do
not match the configured default serdes.
</p>
<p>
<b>Note:</b>
Grouping vs. Windowing: A related operation is windowing, which lets you control how to "sub-group" the
grouped records of the same key into so-called windows for stateful operations such as windowed aggregations
or windowed joins.
</p>
<p>
Causes data re-partitioning if and only if the stream was marked for re-partitioning. <code>groupByKey</code> is
preferable to <code>groupBy</code> because it re-partitions data only if the stream was already marked for re-partitioning.
However, <code>groupByKey</code> does not allow you to modify the key or key type like <code>groupBy</code> does.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey();
// When the key and/or value types do not match the configured
// default serdes, we must explicitly specify serdes.
KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey(
Serialized.with(
Serdes.ByteArray(), /* key */
Serdes.String()) /* value */
);
</pre>
</td>
</tr>
<tr>
<td><b>GroupBy</b>: <code>KStream &rarr; KGroupedStream or KTable &rarr; KGroupedTable</code></td>
<td>
<p>
Groups the records by a new key, which may be of a different key type. When grouping a table,
you may also specify a new value and value type. groupBy is a shorthand for selectKey(...).groupByKey().
</p>
<p>
Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly
partitioned ("keyed") for subsequent operations.
</p>
<p>
<b>When to set explicit serdes</b>: Variants of groupBy exist to override the configured default serdes of your
application, which you must do if the key and/or value types of the resulting KGroupedStream or
KGroupedTable do not match the configured default serdes.
</p>
<p>
<b>Note:</b>
Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the
grouped records of the same key into so-called windows for stateful operations such as windowed aggregations
or windowed joins.
</p>
<p>
<b>Always causes data re-partitioning:</b> groupBy always causes data re-partitioning. If possible use groupByKey
instead, which will re-partition data only if required.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
KTable&lt;byte[], String&gt; table = ...;
// Java 8+ examples, using lambda expressions
// Group the stream by a new key and key type
KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
(key, value) -> value,
Serialize.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
(key, value) -> KeyValue.pair(value, value.length()),
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);
// Java 7 examples
// Group the stream by a new key and key type
KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
new KeyValueMapper&lt;byte[], String, String&gt;&gt;() {
@Override
public String apply(byte[] key, String value) {
return value;
}
},
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
@Override
public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
return KeyValue.pair(value, value.length());
}
},
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);
</pre>
</td>
</tr>
<tr>
<td><b>Map</b>: <code>KStream &rarr; KStream</code></td>
<td>
<p>
Takes one record and produces one record. You can modify the record key and value, including their types.
</p>
<p>
<b>Marks the stream for data re-partitioning:</b> Applying a grouping or a join after <code>flatMap</code> will result in
re-partitioning of the records. If possible use <code>mapValues</code> instead, which will not cause data re-partitioning.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
// Java 8+ example, using lambda expressions
// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream&lt;String, Integer&gt; transformed = stream.map(
(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
// Java 7 example
KStream&lt;String, Integer&gt; transformed = stream.map(
new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
@Override
public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
return new KeyValue&lt;&gt;(value.toLowerCase(), value.length());
}
});
</pre>
</td>
</tr>
<tr>
<td><b>Map (values only)</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
<td>
<p>
Takes one record and produces one record, while retaining the key of the original record. You can modify
the record value and the value type.
</p>
<p>
<code>mapValues</code> is preferable to <code>map</code> because it will not cause data re-partitioning. However, it does not
allow you to modify the key or key type like <code>map</code> does.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
KTable&lt;String, String&gt; table = ...;
// Java 8+ example, using lambda expressions
KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value -> value.toUpperCase());
// Java 7 example
KStream&lt;byte[], String&gt; uppercased = stream.mapValues(
new ValueMapper&lt;String&gt;() {
@Override
public String apply(String s) {
return s.toUpperCase();
}
});
// mapValues on a KTable and also materialize the results into a statestore
table.mapValue(value -> value.toUpperCase(), Materialized.&lt;String, String, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("uppercased"));
</pre>
</td>
</tr>
<tr>
<td><b>Print</b>: <code>KStream &rarr; void </code></td>
<td>
<p>
Terminal operation. Prints the records to <code>System.out</code>. See Javadocs for serde and <code>toString()</code> caveats.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
stream.print();
// You can also override how and where the data is printed, i.e, to file:
stream.print(Printed.toFile("stream.out"));
// with a custom KeyValueMapper and label
stream.print(Printed.toSysOut()
.withLabel("my-stream")
.withKeyValueMapper((key, value) -> key + " -> " + value));
</pre>
</td>
</tr>
<tr>
<td><b>SelectKey</b>: <code>KStream &rarr; KStream</code></td>
<td>
<p>
Assigns a new key, possibly of a new key type, to each record.
</p>
<p>
Marks the stream for data re-partitioning: Applying a grouping or a join after <code>flatMap</code> will result in
re-partitioning of the records.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
// Derive a new record key from the record's value. Note how the key type changes, too.
// Java 8+ example, using lambda expressions
KStream&lt;String, String&gt; rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
// Java 7 example
KStream&lt;String, String&gt; rekeyed = stream.selectKey(
new KeyValueMapper&lt;byte[], String, String&gt;() {
@Override
public String apply(byte[] key, String value) {
return value.split(" ")[0];
}
});
</pre>
</td>
</tr>
<tr>
<td><b>Table to Stream</b>: <code>KTable &rarr; KStream</code></td>
<td>
<p>
Converts this table into a stream.
</p>
<pre class="brush: java;">
KTable&lt;byte[], String> table = ...;
// Also, a variant of `toStream` exists that allows you
// to select a new key for the resulting stream.
KStream&lt;byte[], String> stream = table.toStream();
</pre>
</td>
</tr>
<tr>
<td><b>WriteAsText</b>: <code>KStream &rarr; void </code></td>
<td>
<p>
Terminal operation. Write the records to a file. See Javadocs for serde and <code>toString()</code> caveats.
</p>
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
stream.writeAsText("/path/to/local/output.txt");
// Several variants of `writeAsText` exist to e.g. override the
// default serdes for record keys and record values.
stream.writeAsText("/path/to/local/output.txt", Serdes.ByteArray(), Serdes.String());
</pre>
</td>
</tr>
</tbody>
</table>
<h5><a id="streams_dsl_transformations_stateful" href="#streams_dsl_transformations_stateful">Stateful transformations</a></h5>
<h6><a id="streams_dsl_transformations_stateful_overview" href="#streams_dsl_transformations_stateful_overview">Overview</a></h6>
<p>
Stateful transformations, by definition, depend on state for processing inputs and producing outputs, and
hence implementation-wise they require a state store associated with the stream processor. For example,
in aggregating operations, a windowing state store is used to store the latest aggregation results per window;
in join operations, a windowing state store is used to store all the records received so far within the
defined window boundary.
</p>
<p>
Note, that state stores are fault-tolerant. In case of failure, Kafka Streams guarantees to fully restore
all state stores prior to resuming the processing.
</p>
<p>
Available stateful transformations in the DSL include:
<ul>
<li><a href=#streams_dsl_aggregations>Aggregating</a></li>
<li><a href="#streams_dsl_joins">Joining</a></li>
<li><a href="#streams_dsl_windowing">Windowing (as part of aggregations and joins)</a></li>
<li>Applying custom processors and transformers, which may be stateful, for Processor API integration</li>
</ul>
</p>
<p>
The following diagram shows their relationships:
</p>
<figure>
<img class="centered" src="/{{version}}/images/streams-stateful_operations.png" style="width:500pt;">
<figcaption style="text-align: center;"><i>Stateful transformations in the DSL</i></figcaption>
</figure>
<p>
We will discuss the various stateful transformations in detail in the subsequent sections. However, let's start
with a first example of a stateful application: the canonical WordCount algorithm.
</p>
<p>
WordCount example in Java 8+, using lambda expressions:
</p>
<pre class="brush: java;">
// We assume record values represent lines of text. For the sake of this example, we ignore
// whatever may be stored in the record keys.
KStream&lt;String, String&gt; textLines = ...;
KStream&lt;String, Long&gt; wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the record
// values, i.e. we can ignore whatever data is in the record keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the stream by word to ensure the key of the record is the word.
.groupBy((key, word) -> word)
// Count the occurrences of each word (record key).
//
// This will change the stream type from `KGroupedStream&lt;String, String&gt;` to
// `KTable&lt;String, Long&gt;` (word -> count). We must provide a name for
// the resulting KTable, which will be used to name e.g. its associated
// state store and changelog topic.
.count("Counts")
// Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.
.toStream();
</pre>
<p>
WordCount example in Java 7:
</p>
<pre class="brush: java;">
// Code below is equivalent to the previous Java 8+ example above.
KStream&lt;String, String&gt; textLines = ...;
KStream&lt;String, Long&gt; wordCounts = textLines
.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String value) {
return Arrays.asList(value.toLowerCase().split("\\W+"));
}
})
.groupBy(new KeyValueMapper&lt;String, String, String&gt;&gt;() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count("Counts")
.toStream();
</pre>
<h6><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h6>
<p>
Once records are grouped by key via <code>groupByKey</code> or <code>groupBy</code> -- and
thus represented as either a <code>KGroupedStream</code> or a
<code>KGroupedTable</code> -- they can be aggregated via an operation such as
<code>reduce</code>.
For windowed aggregations use <code>windowedBy(Windows).reduce(Reducer)</code>.
Aggregations are <i>key-based</i> operations, i.e.they always operate over records (notably record values) <i>of the same key</i>.
You maychoose to perform aggregations on
<a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
</p>
<table class="data-table" border="1">
<tbody>
<tr>
<th>Transformation</th>
<th>Description</th>
</tr>
<tr>
<td><b>Aggregate</b>: <code>KGroupedStream &rarr; KTable</code> or <code>KGroupedTable
&rarr; KTable</code></td>
<td>
<p>
<b>Rolling aggregation</b>. Aggregates the values of (non-windowed) records by
the grouped key. Aggregating is a generalization of <code>reduce</code> and allows, for example, the
aggregate value to have a different type than the input values.
</p>
<p>
When aggregating a grouped stream, you must provide an initializer (think:
<code>aggValue = 0</code>) and an "adder"
aggregator (think: <code>aggValue + curValue</code>). When aggregating a <i>grouped</i>
table, you must additionally provide a "subtractor" aggregator (think: <code>aggValue - oldValue</code>).
</p>
<p>
Several variants of <code>aggregate</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;Bytes, String&gt; groupedStream = ...;
KGroupedTable&lt;Bytes, String&gt; groupedTable = ...;
// Java 8+ examples, using lambda expressions
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
Serdes.Long(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
Serdes.Long(), /* serde for aggregate value */
"aggregated-table-store" /* state store name */);
// windowed aggregation
KTable&lt;Windowed&ltBytes&gt;, Long&gt; windowedAggregate = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
.aggregate(() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* aggregator */
Serdes.Long()) /* serde for aggregate value */
// Java 7 examples
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
@Override
public Long apply(Bytes aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
Serdes.Long(),
"aggregated-stream-store");
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
@Override
public Long apply(Bytes aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
new Aggregator&lt;Bytes, String, Long&gt;() { /* subtractor */
@Override
public Long apply(Bytes aggKey, String oldValue, Long aggValue) {
return aggValue - oldValue.length();
}
},
Serdes.Long(),
"aggregated-table-store");
// Windowed aggregation
KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
@Override
public Long apply(Bytes aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
Serdes.Long());
</pre>
<p>
Detailed behavior of <code>KGroupedStream</code>:
</p>
<ul>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time, the initializer is called
(and called before the adder).</li>
<li>Whenever a record with a non-null value is received, the adder is called.</li>
</ul>
<p>
Detailed behavior of KGroupedTable:
</p>
<ul>
<li>Input records with null keys are ignored in general.</li>
<li>When a record key is received for the first time, the initializer is called
(and called before the adder and subtractor). Note that, in contrast to <code>KGroupedStream</code>, over
time the initializer may be called more
than once for a key as a result of having received input tombstone records
for that key (see below).</li>
<li>When the first non-<code>null</code> value is received for a key (think:
INSERT), then only the adder is called.</li>
<li>When subsequent non-<code>null</code> values are received for a key (think:
UPDATE), then (1) the subtractor is called
with the old value as stored in the table and (2) the adder is called with
the new value of the input record
that was just received. The order of execution for the subtractor and adder
is not defined.</li>
<li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is
received for a key (think: DELETE), then
only the subtractor is called. Note that, whenever the subtractor returns a
<code>null</code> value itself, then the
corresponding key is removed from the resulting KTable. If that happens, any
next input record for that key will trigger the initializer again.</li>
</ul>
<p>
See the example at the bottom of this section for a visualization of the
aggregation semantics.
</p>
</td>
</tr>
<tr>
<td><b>Aggregate (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
<td>
<p>
<b>Windowed aggregation</b>. Aggregates the values of records, per window, by
the grouped key. Aggregating is a generalization of
<code>reduce</code> and allows, for example, the aggregate value to have a
different type than the input values.
</p>
<p>
You must provide an initializer (think: <code>aggValue = 0</code>), "adder"
aggregator (think: <code>aggValue + curValue</code>),
and a window. When windowing based on sessions, you must additionally provide a
"session merger" aggregator (think:
<code>mergedAggValue = leftAggValue + rightAggValue</code>).
</p>
<p>
The windowed <code>aggregate</code> turns a <code>KGroupedStream
&lt;K , V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
</p>
<p>
Several variants of <code>aggregate</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
import java.util.concurrent.TimeUnit;
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
Serdes.Long(), /* serde for aggregate value */
"time-windowed-aggregated-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
Serdes.Long(), /* serde for aggregate value */
"sessionized-aggregated-stream-store" /* state store name */);
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;String, Long, Long&gt;() { /* adder */
@Override
public Long apply(String aggKey, Long newValue, Long aggValue) {
return aggValue + newValue;
}
},
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
Serdes.Long(), /* serde for aggregate value */
"time-windowed-aggregated-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;String, Long, Long&gt;() { /* adder */
@Override
public Long apply(String aggKey, Long newValue, Long aggValue) {
return aggValue + newValue;
}
},
new Merger&lt;String, Long&gt;() { /* session merger */
@Override
public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
return rightAggValue + leftAggValue;
}
},
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
Serdes.Long(), /* serde for aggregate value */
"sessionized-aggregated-stream-store" /* state store name */);
</pre>
<p>
Detailed behavior:
</p>
<ul>
<li>The windowed aggregate behaves similar to the rolling aggregate described
above. The additional twist is that the behavior applies per window.</li>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, the
initializer is called (and called before the adder).</li>
<li>Whenever a record with a non-<code>null</code> value is received for a given window, the
adder is called.
(Note: As a result of a known bug in Kafka 0.11.0.0, the adder is currently
also called for <code>null</code> values. You can work around this, for example, by
manually filtering out <code>null</code> values prior to grouping the stream.)</li>
<li>When using session windows: the session merger is called whenever two
sessions are being merged.</li>
</ul>
<p>
See the example at the bottom of this section for a visualization of the aggregation semantics.
</p>
</td>
</tr>
<tr>
<td><b>Count</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
<td>
<p>
<b>Rolling aggregation</b>. Counts the number of records by the grouped key.
Several variants of <code>count</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
KGroupedTable&lt;String, Long&gt; groupedTable = ...;
// Counting a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count(
"counted-stream-store" /* state store name */);
// Counting a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count(
"counted-table-store" /* state store name */);
</pre>
<p>
Detailed behavior for <code>KGroupedStream</code>:
</p>
<ul>
<li>Input records with null keys or values are ignored.</li>
</ul>
<p>
Detailed behavior for <code>KGroupedTable</code>:
</p>
<ul>
<li>Input records with <code>null</code> keys are ignored. Records with <code>null</code>
values are not ignored but interpreted as "tombstones" for the corresponding key, which
indicate the deletion of the key from the table.</li>
</ul>
</td>
</tr>
<tr>
<td><b>Count (Windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
<td>
<p>
Windowed aggregation. Counts the number of records, per window, by the grouped key.
</p>
<p>
The windowed <code>count</code> turns a <code>KGroupedStream<&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
</p>
<p>
Several variants of count exist, see Javadocs for details.
</p>
<pre class="brush: java;">
import java.util.concurrent.TimeUnit;
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream.count(
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-counted-stream-store" /* state store name */);
// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream.count(
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-counted-stream-store" /* state store name */);
</pre>
<p>
Detailed behavior:
</p>
<ul>
<li>Input records with <code>null</code> keys or values are ignored. (Note: As a result of a known bug in Kafka 0.11.0.0,
records with <code>null</code> values are not ignored yet. You can work around this, for example, by manually
filtering out <code>null</code> values prior to grouping the stream.)</li>
</ul>
</td>
</tr>
<tr>
<td><b>Reduce</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
<td>
<p>
<b>Rolling aggregation</b>. Combines the values of (non-windowed) records by the grouped key. The current record value is
combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed,
unlike <code>aggregate</code>.
</p>
<p>
When reducing a grouped stream, you must provide an "adder" reducer (think: <code>aggValue + curValue</code>).
When reducing a grouped table, you must additionally provide a "subtractor" reducer (think: <code>aggValue - oldValue</code>).
</p>
<p>
Several variants of <code>reduce</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
KGroupedTable&lt;String, Long&gt; groupedTable = ...;
// Java 8+ examples, using lambda expressions
// Reducing a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
"reduced-stream-store" /* state store name */);
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
(aggValue, oldValue) -> aggValue - oldValue, /* subtractor */
"reduced-table-store" /* state store name */);
// Java 7 examples
// Reducing a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
"reduced-stream-store" /* state store name */);
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
new Reducer&lt;Long&gt;() { /* subtractor */
@Override
public Long apply(Long aggValue, Long oldValue) {
return aggValue - oldValue;
}
},
"reduced-table-store" /* state store name */);
</pre>
<p>
Detailed behavior for <code>KGroupedStream</code>:
</p>
<ul>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that
record is used as the initial aggregate value.</li>
<li>Whenever a record with a non-<code>null</code> value is received, the adder is called.</li>
</ul>
<p>
Detailed behavior for <code>KGroupedTable</code>:
</p>
<ul>
<li>Input records with null keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that
record is used as the initial aggregate value.
Note that, in contrast to KGroupedStream, over time this initialization step
may happen more than once for a key as a
result of having received input tombstone records for that key (see below).</li>
<li>When the first non-<code>null</code> value is received for a key (think: INSERT), then
only the adder is called.</li>
<li>When subsequent non-<code>null</code> values are received for a key (think: UPDATE), then
(1) the subtractor is called with the
old value as stored in the table and (2) the adder is called with the new
value of the input record that was just received.
The order of execution for the subtractor and adder is not defined.</li>
<li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is received
for a key (think: DELETE), then only the
subtractor is called. Note that, whenever the subtractor returns a <code>null</code>
value itself, then the corresponding key
is removed from the resulting KTable. If that happens, any next input
record for that key will re-initialize its aggregate value.</li>
</ul>
<p>
See the example at the bottom of this section for a visualization of the
aggregation semantics.
<p>
</td>
</tr>
<tr>
<td><b>Reduce (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
<td>
<p>
Windowed aggregation. Combines the values of records, per window, by the grouped key. The current record value
is combined with the last reduced value, and a new reduced value is returned. Records with null key or value are
ignored. The result value type cannot be changed, unlike aggregate. (KGroupedStream details)
</p>
<p>
The windowed reduce turns a <code>KGroupedStream&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
</p>
<p>
Several variants of reduce exist, see Javadocs for details.
</p>
<pre class="brush: java;">
import java.util.concurrent.TimeUnit;
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-reduced-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-reduced-stream-store" /* state store name */);
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-reduced-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-reduced-stream-store" /* state store name */);
</pre>
<p>
Detailed behavior:
</p>
<ul>
<li>The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that
the behavior applies per window.</li>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, then the value of that record is
used as the initial aggregate value.</li>
<li>Whenever a record with a non-<code>null</code> value is received for a given window, the adder is called. (Note: As
a result of a known bug in Kafka 0.11.0.0, the adder is currently also called for <code>null</code> values. You can work
around this, for example, by manually filtering out <code>null</code> values prior to grouping the stream.)</li>
<li>See the example at the bottom of this section for a visualization of the aggregation semantics.</li>
</ul>
</td>
</tr>
</tbody>
</table>
<p>
<b>Example of semantics for stream aggregations</b>: A <code>KGroupedStream &rarr; KTable</code> example is shown below. The streams and the table are
initially empty. We use bold font in the column for "KTable <code>aggregated</code>" to highlight changed state. An entry such as <code>(hello, 1)</code>
denotes a record with key <code>hello</code> and value <code>1</code>. To improve the readability of the semantics table we assume that all records are
processed in timestamp order.
</p>
<pre class="brush: java;">
// Key: word, value: count
KStream&lt;String, Integer&gt; wordCounts = ...;
KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));
KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
Serdes.Integer(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
</pre>
<p>
<b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>: For illustration purposes,
the column "KTable <code>aggregated</code>" below shows the table's state changes over
time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are
disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the
rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key <code>kafka</code> in the KTable
(here: from <code>(kafka 1)</code> directly to <code>(kafka, 3)</code>. Typically, you should only disable record caches for testing or debugging purposes
-- under normal circumstances it is better to leave record caches enabled.
</p>
<table class="data-table" border="1">
<thead>
<col>
<colgroup span="2"></colgroup>
<colgroup span="2"></colgroup>
<col>
<tr>
<th scope="col"></th>
<th colspan="2">KStream wordCounts</th>
<th colspan="2">KGroupedStream groupedStream</th>
<th scope="col">KTable aggregated</th>
</tr>
</thead>
<tbody>
<tr>
<th scope="col">Timestamp</th>
<th scope="col">Input record</th>
<th scope="col">Grouping</th>
<th scope="col">Initializer</th>
<th scope="col">Adder</th>
<th scope="col">State</th>
</tr>
<tr>
<td>1</td>
<td>(hello, 1)</td>
<td>(hello, 1)</td>
<td>0 (for hello)</td>
<td>(hello, 0 + 1)</td>
<td>(hello, 1)</td>
</tr>
<tr>
<td>2</td>
<td>(kafka, 1)</td>
<td>(kafka, 1)</td>
<td>0 (for kafka)</td>
<td>(kafka, 0 + 1)</td>
<td>(hello, 1), (kafka, 1)</td>
</tr>
<tr>
<td>3</td>
<td>(streams, 1)</td>
<td>(streams, 1)</td>
<td>0 (for streams)</td>
<td>(streams, 0 + 1)</td>
<td>(hello, 1), (kafka, 1), (streams, 1)</td>
</tr>
<tr>
<td>4</td>
<td>(kafka, 1)</td>
<td>(kafka, 1)</td>
<td></td>
<td>(kafka, 1 + 1)</td>
<td>(hello, 1), (kafka, 2), (streams, 1)</td>
</tr>
<tr>
<td>5</td>
<td>(kafka, 1)</td>
<td>(kafka, 1)</td>
<td></td>
<td>(kafka, 2 + 1)</td>
<td>(hello, 1), (kafka, 3), (streams, 1)</td>
</tr>
<tr>
<td>6</td>
<td>(streams, 1)</td>
<td>(streams, 1)</td>
<td></td>
<td>(streams, 1 + 1)</td>
<td>(hello, 1), (kafka, 3), (streams, 2)</td>
</tr>
</tbody>
</table>
<p>
Example of semantics for table aggregations: A <code>KGroupedTable &rarr; KTable</code> example is shown below. The tables are initially empty.
We use bold font in the column for "KTable <code>aggregated</code>" to highlight changed state. An entry such as <code>(hello, 1)</code> denotes a
record with key <code>hello</code> and value <code>1</code>. To improve the readability of the semantics table we assume that all records are processed
in timestamp order.
</p>
<pre class="brush: java;">
// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")
KTable&lt;String, String&gt; userProfiles = ...;
// Re-group `userProfiles`. Don't read too much into what the grouping does:
// its prime purpose in this example is to show the *effects* of the grouping
// in the subsequent aggregation.
KGroupedTable&lt;String, Integer&gt; groupedTable = userProfiles
.groupBy((user, region) -> KeyValue.pair(region, user.length()), Serialized.with(Serdes.String(), Serdes.Integer()));
KTable&lt;String, Integer&gt; aggregated = groupedTable.aggregate(
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
Serdes.Integer(), /* serde for aggregate value */
"aggregated-table-store" /* state store name */);
</pre>
<p>
<b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>:
For illustration purposes, the column "KTable <code>aggregated</code>" below shows
the table's state changes over time in a very granular way. In practice, you would observe state changes
in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled,
what might happen for example is that the output results of the rows with timestamps 4 and 5 would be
compacted, and there would only be a single state update for the key <code>kafka</code> in the KTable
(here: from <code>(kafka 1)</code> directly to <code>(kafka, 3)</code>. Typically, you should only disable
record caches for testing or debugging purposes -- under normal circumstances it is better to leave record caches enabled.
</p>
<table class="data-table" border="1">
<thead>
<col>
<colgroup span="2"></colgroup>
<colgroup span="2"></colgroup>
<col>
<tr>
<th scope="col"></th>
<th colspan="3">KTable userProfiles</th>
<th colspan="3">KGroupedTable groupedTable</th>
<th scope="col">KTable aggregated</th>
</tr>
</thead>
<tbody>
<tr>
<th scope="col">Timestamp</th>
<th scope="col">Input record</th>
<th scope="col">Interpreted as</th>
<th scope="col">Grouping</th>
<th scope="col">Initializer</th>
<th scope="col">Adder</th>
<th scope="col">Subtractor</th>
<th scope="col">State</th>
</tr>
<tr>
<td>1</td>
<td>(alice, E)</td>
<td>INSERT alice</td>
<td>(E, 5)</td>
<td>0 (for E)</td>
<td>(E, 0 + 5)</td>
<td></td>
<td>(E, 5)</td>
</tr>
<tr>
<td>2</td>
<td>(bob, A)</td>
<td>INSERT bob</td>
<td>(A, 3)</td>
<td>0 (for A)</td>
<td>(A, 0 + 3)</td>
<td></td>
<td>(A, 3), (E, 5)</td>
</tr>
<tr>
<td>3</td>
<td>(charlie, A)</td>
<td>INSERT charlie</td>
<td>(A, 7)</td>
<td></td>
<td>(A, 3 + 7)</td>
<td></td>
<td>(A, 10), (E, 5)</td>
</tr>
<tr>
<td>4</td>
<td>(alice, A)</td>
<td>UPDATE alice</td>
<td>(A, 5)</td>
<td></td>
<td>(A, 10 + 5)</td>
<td>(E, 5 - 5)</td>
<td>(A, 15), (E, 0)</td>
</tr>
<tr>
<td>5</td>
<td>(charlie, null)</td>
<td>DELETE charlie</td>
<td>(null, 7)</td>
<td></td>
<td></td>
<td>(A, 15 - 7)</td>
<td>(A, 8), (E, 0)</td>
</tr>
<tr>
<td>6</td>
<td>(null, E)</td>
<td>ignored</td>
<td></td>
<td></td>
<td></td>
<td></td>
<td>(A, 8), (E, 0)</td>
</tr>
<tr>
<td>7</td>
<td>(bob, E)</td>
<td>UPDATE bob</td>
<td>(E, 3)</td>
<td></td>
<td>(E, 0 + 3)</td>
<td>(A, 8 - 3)</td>
<td>(A, 5), (E, 3)</td>
</tr>
</tbody>
</table>
<h6><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h6>
A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:
<ul>
<li><b>Hopping time windows</b> are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.</li>
<li><b>Tumbling time windows</b> are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.</li>
<li><b>Sliding windows</b> model a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the <code>JoinWindows</code> class.</li>
<li><b>Session windows</b> are used to aggregate key-based events into sessions.
Sessions represent a period of activity separated by a defined gap of inactivity.
Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
If the event falls outside of the session gap, then a new session will be created.
Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes);
as such session windows can't be pre-computed and are instead derived from analyzing the timestamps of the data records.
</li>
</ul>
<p>
In the Kafka Streams DSL users can specify a <b>retention period</b> for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval.
If a record arrives after the retention period has passed, the record cannot be processed and is dropped.
</p>
<p>
Late-arriving records are always possible in real-time data streams. However, it depends on the effective <a href="/{{version}}/documentation/streams/core-concepts#streams_time">time semantics</a> how late records are handled. Using processing-time, the semantics are "when the data is being processed",
which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving "late") for event-time or ingestion-time semantics. In both cases,
Kafka Streams is able to properly handle late-arriving records.
</p>
<h6><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h6>
A <b>join</b> operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
<ul>
<li><b>KStream-to-KStream Joins</b> are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
<li><b>KTable-to-KTable Joins</b> are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream's materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new <code>KTable</code> instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.</li>
<li><b>KStream-to-KTable Joins</b> allow you to perform table lookups against a changelog stream (<code>KTable</code>) upon receiving a new record from another record stream (<code>KStream</code>). An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>KTable</code>). Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
<li><b>KStream-to-GlobalKTable Joins</b> allow you to perform table lookups against a fully replicated changelog stream (<code>GlobalKTable</code>) upon receiving a new record from another record stream (<code>KStream</code>).
Joins with a <code>GlobalKTable</code> don't require repartitioning of the input <code>KStream</code> as all partitions of the <code>GlobalKTable</code> are available on every KafkaStreams instance.
The <code>KeyValueMapper</code> provided with the join operation is applied to each KStream record to extract the join-key that is used to do the lookup to the GlobalKTable so non-record-key joins are possible.
An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>GlobalKTable</code>).
Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store).
A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
</ul>
Depending on the operands the following join operations are supported: <b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>.
Their <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">semantics</a> are similar to the corresponding operators in relational databases.
<h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h4>
<p>
At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
<code>KStream.to</code> and <code>KTable.to</code>.
</p>
<pre class="brush: java;">
joined.to("topic4");
// or using custom Serdes and a StreamPartitioner
joined.to("topic5", Produced.with(keySerde, valueSerde, myStreamPartitioner));
</pre>
If your application needs to continue reading and processing the records after they have been materialized
to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic;
Kafka Streams provides a convenience method called <code>through</code>:
<pre class="brush: java;">
// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream&lt;String, String&gt; materialized = joined.through("topic4");
// if you need to provide serdes or a custom StreamPartitioner you can use
// the overloaded version
KStream&lt;String, String&gt; materialized = joined.through("topic5",
Produced.with(keySerde, valueSerde, myStreamPartitioner));
</pre>
<br>
<h4><a id="streams_dsl_build" href="#streams_dsl_build">Generate the processor topology</a></h4>
<p>
Within the Streams DSL, while users are specifying the operations to create / transform various streams as described above, a <code>Topology</code> is constructed implicitly within the <code>StreamsBuilder</code>.
Users can generate the constructed topology at any given point in time by calling <code>build</code>:
</p>
<pre class="brush: java;">
Topology topology = builder.build();
</pre>
<p>
Users can investigate the generated <code>Topology</code> via its <code>describe</code> API, and continue building or modifying the topology until they are satisfied with it.
The topology then can be used to execute the application (we will talk about this later in this section).
</p>
<h3><a id="streams_interactive_queries" href="#streams_interactive_queries">Interactive Queries</a></h3>
<p>
Interactive queries let you get more from streaming than just the processing of data. This feature allows you to treat the stream processing layer as a lightweight embedded database and, more concretely, <i>to directly query the latest state</i> of your stream processing application, without needing to materialize that state to external databases or external storage first.
As a result, interactive queries simplify the architecture of many use cases and lead to more application-centric architectures. For example, you often no longer need to operate and interface with a separate database cluster -- or a separate infrastructure team in your company that runs that cluster -- to share data between a Kafka Streams application (say, an event-driven microservice) and downstream applications, regardless of whether these applications use Kafka Streams or not; they may even be applications that do not run on the JVM, e.g. implemented in Python, C/C++, or JavaScript.
The following diagrams juxtapose two architectures: the first does not use interactive queries whereas the second architecture does. It depends on the concrete use case to determine which of these architectures is a better fit -- the important takeaway is that Kafka Streams and interactive queries give you the flexibility to pick and to compose the right one, rather than limiting you to just a single way.
</p>
<figure>
<img class="centered" src="/{{version}}/images/streams-interactive-queries-01.png" style="width:600pt;">
<figcaption style="text-align: center;"><i>Without interactive queries: increased complexity and heavier footprint of architecture</i></figcaption>
</figure>
<figure>
<img class="centered" src="/{{version}}/images/streams-interactive-queries-02.png" style="width:500pt;">
<figcaption style="text-align: center;"><i>With interactive queries: simplified, more application-centric architecture</i></figcaption>
</figure>
<p>
Here are some use case examples for applications that benefit from interactive queries:
</p>
<ul>
<li>Real-time monitoring: A front-end dashboard that provides threat intelligence (e.g., web servers currently
under attack by cyber criminals) can directly query a Kafka Streams application that continuously generates the
relevant information by processing network telemetry data in real-time.
</li>
<li>Video gaming: A Kafka Streams application continuously tracks location updates from players in the gaming universe.
A mobile companion app can then directly query the Kafka Streams application to show the current location of a player
to friends and family, and invite them to come along. Similarly, the game vendor can use the data to identify unusual
hotspots of players, which may indicate a bug or an operational issue.
</li>
<li>Risk and fraud: A Kafka Streams application continuously analyzes user transactions for anomalies and suspicious
behavior. An online banking application can directly query the Kafka Streams application when a user logs in to deny
access to those users that have been flagged as suspicious.
</li>
<li>Trend detection: A Kafka Streams application continuously computes the latest top charts across music genres based on
user listening behavior that is collected in real-time. Mobile or desktop applications of a music store can then
interactively query for the latest charts while users are browsing the store.
</li>
</ul>
<h4><a id="streams_developer-guide_interactive-queries_your_app" href="#streams_developer-guide_interactive-queries_your_app">Your application and interactive queries</a></h4>
<p>
Interactive queries allow you to tap into the <i>state</i> of your application, and notably to do that from outside your application.
However, an application is not interactively queryable out of the box: you make it queryable by leveraging the API of Kafka Streams.
</p>
<p>
It is important to understand that the state of your application -- to be extra clear, we might call it "the full state of the entire application" -- is typically split across many distributed instances of your application, and thus across many state stores that are managed locally by these application instances.
</p>
<img class="centered" src="/{{version}}/images/streams-interactive-queries-03.png" style="width:400pt; height:400pt;">
<p>
Accordingly, the API to let you interactively query your application's state has two parts, a <i>local</i> and a <i>remote</i> one:
</p>
<ol>
<li><a href="#streams_developer-guide_interactive-queries_local-stores">Querying local state stores (for an application instance)</a>: You can query that (part of the full) state that is managed locally by an instance of your application. Here, an application instance can directly query its own local state stores. You can thus use the corresponding (local) data in other parts of your application code that are not related to calling the Kafka Streams API. Querying state stores is always *read-only* to guarantee that the underlying state stores will never be mutated out-of-band, e.g. you cannot add new entries; state stores should only ever be mutated by the corresponding processor topology and the input data it operates on.
</li>
<li><a href="#streams_developer-guide_interactive-queries_discovery">Querying remote state stores (for the entire application)</a>: To query the full state of your entire application we must be able to piece together the various local fragments of the state. In addition to being able to (a) query local state stores as described in the previous bullet point, we also need to (b) discover all the running instances of your application in the network, including their respective state stores and (c) have a way to communicate with these instances over the network, i.e. an RPC layer. Collectively, these building blocks enable intra-app communcation (between instances of the same app) as well as inter-app communication (from other applications) for interactive queries.
</li>
</ol>
<table class="data-table">
<tbody>
<tr>
<th>What of the below is required to access the state of ...</th>
<th>... an app instance (local state)</th>
<th>... the entire application (full state)</th>
</tr>
<tr>
<td>Query local state stores of an app instance</td><td>Required (but already built-in)</td><td>Required (but already built-in)</td>
</tr>
<tr>
<td>Make an app instance discoverable to others</td><td>Not needed</td><td>Required (but already built-in)</td>
</tr>
<tr>
<td>Discover all running app instances and their state stores</td><td>Not needed</td><td>Required (but already built-in)</td>
</tr>
<tr>
<td>Communicate with app instances over the network (RPC)</td><td>Not needed</td><td>Required <b>user must provide</b></td>
</tr>
</tbody>
</table>
<p>
Kafka Streams provides all the required functionality for interactively querying your application's state out of the box, with but one exception: if you want to expose your application's full state via interactive queries, then --
for reasons we explain further down below -- it is your responsibility to add an appropriate RPC layer (such as a REST
API) to your application that allows application instances to communicate over the network. If, however, you only need
to let your application instances access their own local state, then you do not need to add such an RPC layer at all.
</p>
<h4><a id="streams_developer-guide_interactive-queries_local-stores" href="#streams_developer-guide_interactive-queries_local-stores">Querying local state stores (for an application instance)</a></h4>
<p>
A Kafka Streams application is typically running on many instances.
The state that is locally available on any given instance is only a subset of the application's entire state.
Querying the local stores on an instance will, by definition, <i>only return data locally available on that particular instance</i>.
We explain how to access data in state stores that are not locally available in section <a href="#streams_developer-guide_interactive-queries_discovery"><b>Querying remote state stores</b></a> (for the entire application).
</p>
<p>
The method <code>KafkaStreams#store(...)</code> finds an application instance's local state stores <i>by name</i> and <i>by type</i>.
</p>
<figure>
<img class="centered" src="/{{version}}/images/streams-interactive-queries-api-01.png" style="width:500pt;">
<figcaption style="text-align: center;"><i>Every application instance can directly query any of its local state stores</i></figcaption>
</figure>
<p>
The <i>name</i> of a state store is defined when you are creating the store, either when creating the store explicitly (e.g. when using the Processor API) or when creating the store implicitly (e.g. when using stateful operations in the DSL).
We show examples of how to name a state store further down below.
</p>
<p>
The <i>type</i> of a state store is defined by <code>QueryableStoreType</code>, and you can access the built-in types via the class <code>QueryableStoreTypes</code>.
Kafka Streams currently has two built-in types:
</p>
<ul>
<li>A key-value store <code>QueryableStoreTypes#keyValueStore()</code>, see <a href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying local key-value stores</a>.</li>
<li>A window store <code>QueryableStoreTypes#windowStore()</code>, see <a href="#streams_developer-guide_interactive-queries_local-window-stores">Querying local window stores</a>.</li>
</ul>
<p>
Both store types return <i>read-only</i> versions of the underlying state stores.
This read-only constraint is important to guarantee that the underlying state stores will never be mutated (e.g. new entries added) out-of-band, i.e. only the corresponding processing topology of Kafka Streams is allowed to mutate and update the state stores in order to ensure data consistency.
</p>
<p>
You can also implement your own <code>QueryableStoreType</code> as described in section <a href="#streams_developer-guide_interactive-queries_custom-stores#"><b>Querying local custom stores</b></a>
</p>
<p>
Kafka Streams materializes one state store per stream partition, which means your application will potentially manage many underlying state stores.
The API to query local state stores enables you to query all of the underlying stores without having to know which partition the data is in.
The objects returned from <code>KafkaStreams#store(...)</code> are therefore wrapping potentially many underlying state stores.
Note that it is the caller's responsibility to close the iterator on state store;
otherwise it may lead to OOM and leaked file handlers depending on the state store implementation.
</p>
<h4><a id="streams_developer-guide_interactive-queries_local-key-value-stores" href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying local key-value stores</a></h4>
<p>
To query a local key-value store, you must first create a topology with a key-value store:
</p>
<pre class="brush: java;">
StreamsConfig config = ...;
StreamsBuilder builder = ...;
KStream&lt;String, String&gt; textLines = ...;
// Define the processing topology (here: WordCount)
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a key-value store named "CountsKeyValueStore" for the all-time word counts
groupedByWord.count("CountsKeyValueStore");
// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
</pre>
<p>
Above we created a key-value store named "CountsKeyValueStore".
This store will hold the latest count for any word that is found on the topic "word-count-input".
Once the application has started we can get access to "CountsKeyValueStore" and then query it via the <code>ReadOnlyKeyValueStore</code> API:
</p>
<pre class="brush: java;">
// Get the key-value store CountsKeyValueStore
ReadOnlyKeyValueStore&lt;String, Long&gt; keyValueStore =
streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());
// Get value by key
System.out.println("count for hello:" + keyValueStore.get("hello"));
// Get the values for a range of keys available in this application instance
KeyValueIterator&lt;String, Long&gt; range = keyValueStore.range("all", "streams");
while (range.hasNext()) {
KeyValue&lt;String, Long&gt; next = range.next();
System.out.println("count for " + next.key + ": " + value);
}
range.close(); // close iterator to avoid memory leak
// Get the values for all of the keys available in this application instance
KeyValueIterator&lt;String, Long&gt; range = keyValueStore.all();
while (range.hasNext()) {
KeyValue&lt;String, Long&gt; next = range.next();
System.out.println("count for " + next.key + ": " + value);
}
range.close(); // close iterator to avoid memory leak
</pre>
<h4><a id="streams_developer-guide_interactive-queries_local-window-stores" href="#streams_developer-guide_interactive-queries_local-window-stores">Querying local window stores</a></h4>
<p>
A window store differs from a key-value store in that you will potentially have many results for any given key because the key can be present in multiple windows.
However, there will ever be at most one result per window for a given key.
</p>
<p>
To query a local window store, you must first create a topology with a window store:
</p>
<pre class="brush: java;">
StreamsConfig config = ...;
StreamsBuilder builder = ...;
KStream&lt;String, String&gt; textLines = ...;
// Define the processing topology (here: WordCount)
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
</pre>
<p>
Above we created a window store named "CountsWindowStore" that contains the counts for words in 1-minute windows.
Once the application has started we can get access to "CountsWindowStore" and then query it via the <code>ReadOnlyWindowStore</code> API:
</p>
<pre class="brush: java;">
// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore&lt;String, Long&gt; windowStore =
streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
// Fetch values for the key "world" for all of the windows available in this application instance.
// To get *all* available windows we fetch windows from the beginning of time until now.
long timeFrom = 0; // beginning of time = oldest available
long timeTo = System.currentTimeMillis(); // now (in processing-time)
WindowStoreIterator&lt;Long&gt; iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
KeyValue&lt;Long, Long&gt; next = iterator.next();
long windowTimestamp = next.key;
System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
}
iterator.close();
</pre>
<h4><a id="streams_developer-guide_interactive-queries_custom-stores" href="#streams_developer-guide_interactive-queries_custom-stores">Querying local custom state stores</a></h4>
<p>
Any custom state stores you use in your Kafka Streams applications can also be queried.
However there are some interfaces that will need to be implemented first:
</p>
<ol>
<li>Your custom state store must implement <code>StateStore</code>.</li>
<li>You should have an interface to represent the operations available on the store.</li>
<li>It is recommended that you also provide an interface that restricts access to read-only operations so users of this API can't mutate the state of your running Kafka Streams application out-of-band.</li>
<li>You also need to provide an implementation of <code>StoreSupplier</code> for creating instances of your store.</li>
</ol>
<p>
The class/interface hierarchy for your custom store might look something like:
</p>
<pre class="brush: java;">
public class MyCustomStore&lt;K,V&gt; implements StateStore, MyWriteableCustomStore&lt;K,V&gt; {
// implementation of the actual store
}
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore&lt;K,V&gt; extends MyReadableCustomStore&lt;K,V&gt; {
void write(K Key, V value);
}
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore&lt;K,V&gt; {
V read(K key);
}
public class MyCustomStoreSupplier implements StoreSupplier {
// implementation of the supplier for MyCustomStore
}
</pre>
<p>
To make this store queryable you need to:
</p>
<ul>
<li>Provide an implementation of <code>QueryableStoreType</code>.</li>
<li>Provide a wrapper class that will have access to all of the underlying instances of the store and will be used for querying.</li>
</ul>
<p>
Implementing <code>QueryableStoreType</code> is straight forward:
</p>
<pre class="brush: java;">
public class MyCustomStoreType&lt;K,V&gt; implements QueryableStoreType&lt;MyReadableCustomStore&lt;K,V&gt;&gt; {
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore) {
return stateStore instanceOf MyCustomStore;
}
public MyReadableCustomStore&lt;K,V&gt; create(final StateStoreProvider storeProvider, final String storeName) {
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
}
}
</pre>
<p>
A wrapper class is required because even a single instance of a Kafka Streams application may run multiple stream tasks and, by doing so, manage multiple local instances of a particular state store.
The wrapper class hides this complexity and lets you query a "logical" state store with a particular name without having to know about all of the underlying local instances of that state store.
</p>
<p>
When implementing your wrapper class you will need to make use of the <code>StateStoreProvider</code>
interface to get access to the underlying instances of your store.
<code>StateStoreProvider#stores(String storeName, QueryableStoreType&lt;T&gt; queryableStoreType)</code> returns a <code>List</code> of state stores with the given <code>storeName</code> and of the type as defined by <code>queryableStoreType</code>.
</p>
<p>
An example implementation of the wrapper follows (Java 8+):
</p>
<pre class="brush: java;">
// We strongly recommended implementing a read-only interface
// to restrict usage of the store to safe read operations!
public class MyCustomStoreTypeWrapper&lt;K,V&gt; implements MyReadableCustomStore&lt;K,V&gt; {
private final QueryableStoreType&lt;MyReadableCustomStore&lt;K, V&gt;&gt; customStoreType;
private final String storeName;
private final StateStoreProvider provider;
public CustomStoreTypeWrapper(final StateStoreProvider provider,
final String storeName,
final QueryableStoreType&lt;MyReadableCustomStore&lt;K, V&gt;&gt; customStoreType) {
// ... assign fields ...
}
// Implement a safe read method
@Override
public V read(final K key) {
// Get all the stores with storeName and of customStoreType
final List&lt;MyReadableCustomStore&lt;K, V&gt;&gt; stores = provider.getStores(storeName, customStoreType);
// Try and find the value for the given key
final Optional&lt;V&gt; value = stores.stream().filter(store -> store.read(key) != null).findFirst();
// Return the value if it exists
return value.orElse(null);
}
}
</pre>
<p>
Putting it all together you can now find and query your custom store:
</p>
<pre class="brush: java;">
StreamsConfig config = ...;
Topology topology = ...;
ProcessorSupplier processorSuppler = ...;
// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
// Add the source topic
topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
topology.addStateStore(customStoreSupplier, "the-processor");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
// Get access to the custom store
MyReadableCustomStore&lt;String,String&gt; store = streams.store("the-custom-store", new MyCustomStoreType&lt;String,String&gt;());
// Query the store
String value = store.read("key");
</pre>
<h4><a id="streams_developer-guide_interactive-queries_discovery" href="#streams_developer-guide_interactive-queries_discovery">Querying remote state stores (for the entire application)</a></h4>
<p>
Typically, the ultimate goal for interactive queries is not to just query locally available state stores from within an instance of a Kafka Streams application as described in the previous section.
Rather, you want to expose the application's full state (i.e. the state across all its instances) to other applications that might be running on different machines.
For example, you might have a Kafka Streams application that processes the user events in a multi-player video game, and you want to retrieve the latest status of each user directly from this application so that you can display it in a mobile companion app.
</p>
<p>
Three steps are needed to make the full state of your application queryable:
</p>
<ol>
<li>You must <a href="#streams_developer-guide_interactive-queries_rpc-layer">add an RPC layer to your application</a> so that the instances of your application may be interacted with via the network -- notably to respond to interactive queries.
By design Kafka Streams does not provide any such RPC functionality out of the box so that you can freely pick your favorite approach: a REST API, Thrift, a custom protocol, and so on.</li>
<li>You need to <a href="#streams_developer-guide_interactive-queries_expose-rpc">expose the respective RPC endpoints</a> of your application's instances via the <code>application.server</code> configuration setting of Kafka Streams.
Because RPC endpoints must be unique within a network, each instance will have its own value for this configuration setting.
This makes an application instance discoverable by other instances.</li>
<li> In the RPC layer, you can then <a href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">discover remote application instances</a> and their respective state stores (e.g. for forwarding queries to other app instances if an instance lacks the local data to respond to a query) as well as <a href="#streams_developer-guide_interactive-queries_local-stores">query locally available state stores</a> (in order to directly respond to queries) in order to make the full state of your application queryable.</li>
</ol>
<figure>
<img class="centered" src="/{{version}}/images/streams-interactive-queries-api-02.png" style="width:500pt;">
<figcaption style="text-align: center;"><i>Discover any running instances of the same application as well as the respective RPC endpoints they expose for interactive queries</i></figcaption>
</figure>
<h4><a id="streams_developer-guide_interactive-queries_rpc-layer" href="#streams_developer-guide_interactive-queries_rpc-layer">Adding an RPC layer to your application</a></h4>
<p>
As Kafka Streams doesn't provide an RPC layer you are free to choose your favorite approach.
There are many ways of doing this, and it will depend on the technologies you have chosen to use.
The only requirements are that the RPC layer is embedded within the Kafka Streams application and that it exposes an endpoint that other application instances and applications can connect to.
</p>
<h4><a id="streams_developer-guide_interactive-queries_expose-rpc" href="#streams_developer-guide_interactive-queries_expose-rpc">Exposing the RPC endpoints of your application</a></h4>
<p>
To enable the remote discovery of state stores running within a (typically distributed) Kafka Streams application you need to set the <code>application.server</code> configuration property in <code>StreamsConfig</code>.
The <code>application.server</code> property defines a unique <code>host:port</code> pair that points to the RPC endpoint of the respective instance of a Kafka Streams application.
It's important to understand that the value of this configuration property varies across the instances of your application.
When this property is set, then, for every instance of an application, Kafka Streams will keep track of the instance's RPC endpoint information, its state stores, and assigned stream partitions through instances of <code>StreamsMetadata</code>
</p>
<p>
Below is an example of configuring and running a Kafka Streams application that supports the discovery of its state stores.
</p>
<pre class="brush: java;">
Properties props = new Properties();
// Set the unique RPC endpoint of this application instance through which it
// can be interactively queried. In a real application, the value would most
// probably not be hardcoded but derived dynamically.
String rpcEndpoint = "host1:4460";
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
// ... further settings may follow here ...
StreamsConfig config = new StreamsConfig(props);
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; textLines = builder.stream("word-count-input", Consumed.with(stringSerde, stringSerde);
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.
groupedByWord.count("word-count");
// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
// Then, create and start the actual RPC service for remote access to this
// application instance's local state stores.
//
// This service should be started on the same host and port as defined above by
// the property `StreamsConfig.APPLICATION_SERVER_CONFIG`. The example below is
// fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)
// that showcase how to implement such a service to get you started.
MyRPCService rpcService = ...;
rpcService.listenAt(rpcEndpoint);
</pre>
<h4><a id="streams_developer-guide_interactive-queries_discover-app-instances-and-stores" href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">Discovering and accessing application instances and their respective local state stores</a></h4>
<p>
With the <code>application.server</code> property set, we can now find the locations of remote app instances and their state stores.
The following methods return <code>StreamsMetadata</code> objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores.
</p>
<ul>
<li><code>KafkaStreams#allMetadata()</code>: find all instances of this application</li>
<li><code>KafkaStreams#allMetadataForStore(String storeName)</code>: find those applications instances that manage local instances of the state store "storeName"</li>
<li><code>KafkaStreams#metadataForKey(String storeName, K key, Serializer&lt;K&gt; keySerializer)</code>: using the default stream partitioning strategy, find the one application instance that holds the data for the given key in the given state store</li>
<li><code>KafkaStreams#metadataForKey(String storeName, K key, StreamPartitioner&lt;K, ?&gt; partitioner)</code>: using <code>>partitioner</code>, find the one application instance that holds the data for the given key in the given state store</li>
</ul>
<p>
If <code>application.server</code> is not configured for an application instance, then the above methods will not find any <code>StreamsMetadata</code> for it.
</p>
<p>
For example, we can now find the <code>StreamsMetadata</code> for the state store named "word-count" that we defined in the code example shown in the previous section:
</p>
<pre class="brush: java;">
KafkaStreams streams = ...;
// Find all the locations of local instances of the state store named "word-count"
Collection&lt;StreamsMetadata&gt; wordCountHosts = streams.allMetadataForStore("word-count");
// For illustrative purposes, we assume using an HTTP client to talk to remote app instances.
HttpClient http = ...;
// Get the word count for word (aka key) 'alice': Approach 1
//
// We first find the one app instance that manages the count for 'alice' in its local state stores.
StreamsMetadata metadata = streams.metadataForKey("word-count", "alice", Serdes.String().serializer());
// Then, we query only that single app instance for the latest count of 'alice'.
// Note: The RPC URL shown below is fictitious and only serves to illustrate the idea. Ultimately,
// the URL (or, in general, the method of communication) will depend on the RPC layer you opted to
// implement. Again, we provide end-to-end demo applications (such as KafkaMusicExample) that showcase
// how to implement such an RPC layer.
Long result = http.getLong("http://" + metadata.host() + ":" + metadata.port() + "/word-count/alice");
// Get the word count for word (aka key) 'alice': Approach 2
//
// Alternatively, we could also choose (say) a brute-force approach where we query every app instance
// until we find the one that happens to know about 'alice'.
Optional&lt;Long&gt; result = streams.allMetadataForStore("word-count")
.stream()
.map(streamsMetadata -> {
// Construct the (fictituous) full endpoint URL to query the current remote application instance
String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
// Read and return the count for 'alice', if any.
return http.getLong(url);
})
.filter(s -> s != null)
.findFirst();
</pre>
<p>
At this point the full state of the application is interactively queryable:
</p>
<ul>
<li>We can discover the running instances of the application as well as the state stores they manage locally.</li>
<li>Through the RPC layer that was added to the application, we can communicate with these application instances over the network and query them for locally available state</li>
<li>The application instances are able to serve such queries because they can directly query their own local state stores and respond via the RPC layer</li>
<li>Collectively, this allows us to query the full state of the entire application</li>
</ul>
<h3><a id="streams_developer-guide_memory-management" href="#streams_developer-guide_memory-management">Memory Management</a></h3>
<h4><a id="streams_developer-guide_memory-management_record-cache" href="#streams_developer-guide_memory-management_record-cache">Record caches in the DSL</a></h4>
<p>
Developers of an application using the DSL have the option to specify, for an instance of a processing topology, the
total memory (RAM) size of a record cache that is leveraged by the following <code>KTable</code> instances:
</p>
<ol>
<li>Source <code>KTable</code>, i.e. <code>KTable</code> instances that are created via <code>StreamBuilder#table()</code> or <code>StreamBuilder#globalTable()</code>.</li>
<li>Aggregation <code>KTable</code>, i.e. instances of <code>KTable</code> that are created as a result of aggregations</li>
</ol>
<p>
For such <code>KTable</code> instances, the record cache is used for:
</p>
<ol>
<li>Internal caching and compacting of output records before they are written by the underlying stateful processor node to its internal state store.</li>
<li>Internal caching and compacting of output records before they are forwarded from the underlying stateful processor node to any of its downstream processor nodes</li>
</ol>
<p>
Here is a motivating example:
</p>
<ul>
<li>Imagine the input is a <code>KStream&lt;String, Integer&gt;</code> with the records <code>&lt;A, 1&gt;, &lt;D, 5&gt;, &lt;A, 20&gt;, &lt;A, 300&gt;</code>.
Note that the focus in this example is on the records with key == <code>A</code>
</li>
<li>
An aggregation computes the sum of record values, grouped by key, for the input above and returns a <code>KTable&lt;String, Integer&gt;</code>.
<ul>
<li><b>Without caching</b>, what is emitted for key <code>A</code> is a sequence of output records that represent changes in the
resulting aggregation table (here, the parentheses denote changes, where the left and right numbers denote the new
aggregate value and the previous aggregate value, respectively):
<code>&lt;A, (1, null)&gt;, &lt;A, (21, 1)&gt;, &lt;A, (321, 21)&gt;</code>.</li>
<li>
<b>With caching</b>, the aforementioned three output records for key <code>A</code> would likely be compacted in the cache,
leading to a single output record <code>&lt;A, (321, null)&gt;</code> that is written to the aggregation's internal state store
and being forwarded to any downstream operations.
</li>
</ul>
</li>
</ul>
<p>
The cache size is specified through the <code>cache.max.bytes.buffering</code> parameter, which is a global setting per processing topology:
</p>
<pre class="brush: java;">
// Enable record cache of size 10 MB.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
</pre>
<p>
This parameter controls the number of bytes allocated for caching.
Specifically, for a processor topology instance with <code>T</code> threads and <code>C</code> bytes allocated for caching,
each thread will have an even <code>C/T</code> bytes to construct its own cache and use as it sees fit among its tasks.
I.e., there are as many caches as there are threads, but no sharing of caches across threads happens.
The basic API for the cache is made of <code>put()</code> and <code>get()</code> calls.
Records are evicted using a simple LRU scheme once the cache size is reached.
The first time a keyed record <code>R1 = &lt;K1, V1&gt;</code> finishes processing at a node, it is marked as dirty in the cache.
Any other keyed record <code>R2 = &lt;K1, V2&gt;</code> with the same key <code>K1</code> that is processed on that node during that time will overwrite <code>&lt;K1, V1&gt;</code>, which we also refer to as "being compacted".
Note that this has the same effect as <a href="https://kafka.apache.org/documentation.html#compaction">Kafka's log compaction</a>, but happens (a) earlier, while the
records are still in memory, and (b) within your client-side application rather than on the server-side aka the Kafka broker.
Upon flushing <code>R2</code> is (1) forwarded to the next processing node and (2) written to the local state store.
</p>
<p>
The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node
whenever the earliest of <code>commit.interval.ms</code> or <code>cache.max.bytes.buffering</code> (cache pressure) hits.
Both <code>commit.interval.ms</code> and <code>cache.max.bytes.buffering</code> are <b>global</b> parameters: they apply to all processor nodes in
the topology, i.e., it is not possible to specify different parameters for each node.
Below we provide some example settings for both parameters based on desired scenarios.
</p>
<p>To turn off caching the cache size can be set to zero:</p>
<pre class="brush: java;">
// Disable record cache
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
</pre>
<p>
Turning off caching might result in high write traffic for the underlying RocksDB store.
With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled.
Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off.
</p>
<p>
For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB.
</p>
<p>
To enable caching but still have an upper bound on how long records will be cached, the commit interval can be set
appropriately (in this example, it is set to 1000 milliseconds):
</p>
<pre class="brush: java;">
Properties streamsConfiguration = new Properties();
// Enable record cache of size 10 MB.
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// Set commit interval to 1 second.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
</pre>
<p>
The illustration below shows the effect of these two configurations visually.
For simplicity we have records with 4 keys: blue, red, yellow and green. Without loss of generality, let's assume the cache has space for only 3 keys.
When the cache is disabled, we observer that all the input records will be output. With the cache enabled, we make the following observations.
First, most records are output at the end of a commit intervals (e.g., at <code>t1</code> one blue records is output, which is the final over-write of the blue key up to that time).
Second, some records are output because of cache pressure, i.e. before the end of a commit interval (cf. the red record right before t2).
With smaller cache sizes we expect cache pressure to be the primary factor that dictates when records are output. With large cache sizes, the commit interval will be the primary factor.
Third, the number of records output has been reduced (here: from 15 to 8).
</p>
<img class="centered" src="/{{version}}/images/streams-cache-and-commit-interval.png" style="width:500pt;height:400pt;">
<h4><a id="streams_developer-guide_memory-management_state-store-cache" href="#streams_developer-guide_memory-management_state-store-cache">State store caches in the Processor API</a></h4>
<p>
Developers of a Kafka Streams application using the Processor API have the option to specify, for an instance of a
processing topology, the total memory (RAM) size of the <i>state store cache</i> that is used for:
</p>
<ul><li>Internal <i>caching and compacting</i> of output records before they are written from a <b>stateful</b> processor node to its state stores.</li></ul>
<p>
Note that, unlike <a href="#streams_developer-guide_memory-management_record-cache">record caches</a> in the DSL, the state
store cache in the Processor API <i>will not cache or compact</i> any output records that are being forwarded downstream.
In other words, downstream processor nodes see all records, whereas the state stores see a reduced number of records.
It is important to note that this does not impact correctness of the system but is merely a performance optimization
for the state stores.
</p>
<p>
A note on terminology: we use the narrower term <i>state store caches</i> when we refer to the Processor API and the
broader term <i>record caches</i> when we are writing about the DSL.
We made a conscious choice to not expose the more general record caches to the Processor API so that we keep it simple and flexible.
For example, developers of the Processor API might chose to store a record in a state store while forwarding a different value downstream, i.e., they
might not want to use the unified record cache for both state store and forwarding downstream.
</p>
<p>
Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching,
you first create a <code>StateStoreBuilder</code> and then call <code>withCachingEnabled</code> (note that caches
are disabled by default and there is no explicit <code>withCachingDisabled</code> call) :
</p>
<pre class="brush: java;">
KeyValueBytesStoreSupplier countSupplier = Stores.persistentKeyValueStore("Counts");
StateStoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; builder = Stores.keyValueStoreBuilder(countSupplier, Serdes.String(), Serdes.Long());
builder.withCachingEnabled()
</pre>
<h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other memory usage</a></h4>
<p>
There are other modules inside Apache Kafka that allocate memory during runtime. They include the following:
</p>
<ul>
<li>Producer buffering, managed by the producer config <code>buffer.memory</code></li>
<li>Consumer buffering, currently not strictly managed, but can be indirectly controlled by fetch size, i.e.,
<code>fetch.max.bytes</code> and <code>fetch.max.wait.ms</code>.</li>
<li>Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory.
These are controlled by the <code>send.buffer.bytes</code> / <code>receive.buffer.bytes</code> configs.</li>
<li>Deserialized objects buffering: after ``consumer.poll()`` returns records, they will be deserialized to extract
timestamp and buffered in the streams space.
Currently this is only indirectly controlled by <code>buffered.records.per.partition</code>.</li>
<li>RocksDB's own memory usage, both on-heap and off-heap; critical configs (for RocksDB version 4.1.0) include
<code>block_cache_size</code>, <code>write_buffer_size</code> and <code>max_write_buffer_number</code>.
These can be specified through the ``rocksdb.config.setter`` configuration.</li>
</ul>
<h3><a id="streams_configure_execute" href="#streams_configure_execute">Application Configuration and Execution</a></h3>
<p>
Besides defining the topology, developers will also need to configure their applications
in <code>StreamsConfig</code> before running it. A complete list of
Kafka Streams configs can be found <a href="/{{version}}/documentation/#streamsconfigs"><b>here</b></a>.
Note, that different parameters do have different "levels of importance", with the following interpretation:
</p>
<ul>
<li> HIGH: you would most likely change the default value if you go to production </li>
<li> MEDIUM: default value might be ok, but you should double-check it </li>
<li> LOW: default value is most likely ok; only consider to change it if you hit an issues when running in production </li>
</ul>
<p>
Specifying the configuration in Kafka Streams is similar to the Kafka Producer and Consumer clients. Typically, you create a <code>java.util.Properties</code> instance,
set the necessary parameters, and construct a <code>StreamsConfig</code> instance from the <code>Properties</code> instance.
</p>
<pre class="brush: java;">
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
// Set a few user customized parameters
settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
settings.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);
// Any further settings
settings.put(... , ...);
// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);
</pre>
<h4><a id="streams_client_config" href="#streams_client_config">Producer and Consumer Configuration</a></h4>
<p>
Apart from Kafka Streams' own configuration parameters you can also specify parameters for the Kafka consumers and producers that are used internally,
depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via <code>StreamsConfig</code>.
Note that some consumer and producer configuration parameters do use the same parameter name. For example, <code>send.buffer.bytes</code> or <code>receive.buffer.bytes</code> which
are used to configure TCP buffers; <code>request.timeout.ms</code> and <code>retry.backoff.ms</code> which control retries for client request (and some more).
If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>:
</p>
<pre class="brush: java;">
Properties settings = new Properties();
// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
// Customize different values for consumer and producer
settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StreamsConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);
</pre>
<h4><a id="streams_broker_config" href="#streams_broker_config">Broker Configuration</a></h4>
<p>
Introduced in 0.11.0 is a new broker config that is particularly relevant to Kafka Streams applications, <code>group.initial.rebalance.delay.ms</code>.
This config specifies the time, in milliseconds, that the <code>GroupCoordinator</code> will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of <code>group.initial.rebalance.delay.ms</code> as each new member joins the consumer group, up to a maximum of the value set by <code>max.poll.interval.ms</code>.
The net benefit is that this should reduce the overall startup time for Kafka Streams applications with more than one thread.
The default value for <code>group.initial.rebalance.delay.ms</code> is 3 seconds.
</p>
<p>
In practice this means that if you are starting up your Kafka Streams app from a cold start, then when the first member joins the group there will be at least a 3 second delay before it is assigned any tasks.
If any other members join the group within the initial 3 seconds, then there will be a further 3 second delay.
Once no new members have joined the group within the 3 second delay, or <code>max.poll.interval.ms</code> is reached, then the group rebalance can complete and all current members will be assigned tasks.
The benefit of this approach, particularly for Kafka Streams applications, is that we can now delay the assignment and re-assignment of potentially expensive tasks as new members join.
So we can avoid the situation where one instance is assigned all tasks, begins restoring/processing, only to shortly after be rebalanced, and then have to start again with half of the tasks and so on.
</p>
<h4><a id="streams_topic_config" href="#streams_topic_config">Internal Topic Configuration</a></h4>
<p>
Kafka Streams automatically creates internal repartitioning and changelog topics.
You can override the default configs used when creating these topics by adding any configs from <code>TopicConfig</code> to your <code>StreamsConfig</code> with the prefix <code>StreamsConfig.TOPIC_PREFIX</code>:
</p>
<pre class="brush: java;">
Properties settings = new Properties();
// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Add a topic config by prefixing with topic.
settings.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.SEGMENT_BYTES_CONFIG, 1024 * 1024);
// Alternatively, you can use
settings.put(StreamsConfig.topicPrefix(ConsumerConfig.SEGMENT_BYTES_CONFIG), 1024 * 1024);
</pre>
<p>
For changelog topics you can also override the default configs on a per store basis.
This can be done by using any method overload that has a <code>StateStoreSupplier</code> as a parameter:
</p>
<pre class="brush: java;">
// a map to add topic config
Map&lt;String, String&gt; topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000");
StateStoreSupplier supplier = Stores.create("store")
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.enableLogging(topicConfig) // pass in the config overrides
.build();
groupedStream.count(supplier)
</pre>
<h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>
<p>
You can call Kafka Streams from anywhere in your application code.
Very commonly though you would do so within the <code>main()</code> method of your application, or some variant thereof.
</p>
<p>
First, you must create an instance of <code>KafkaStreams</code>.
The first argument of the <code>KafkaStreams</code> constructor takes an instance of <code>Topology</code>.
This topology can be either created directly following the <code>Processor</code> API or implicitly via the <code>StreamsBuilder</code> in the higher-level Streams DSL.
The second argument is an instance of <code>StreamsConfig</code> mentioned above.
</p>
<pre class="brush: java;">
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
Topology topology = ...; // when using the Processor API
//
// OR
//
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
Topology topology = builder.build();
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(topology, config);
</pre>
<p>
At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code> method:
</p>
<pre class="brush: java;">
// Start the Kafka Streams instance
streams.start();
</pre>
<p>
To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
</p>
<pre class="brush: java;">
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public uncaughtException(Thread t, throwable e) {
// here you should examine the exception and perform an appropriate action!
}
);
</pre>
<p>
To retrieve information about the local running threads, you can use the <code>localThreadsMetadata()</code> method after you start the application.
</p>
<pre class="brush: java;">
// For instance, use this method to print/monitor the partitions assigned to each local tasks.
Set&lt;ThreadMetadata&gt; threads = streams.localThreadsMetadata();
...
</pre>
<p>
To stop the application instance call the <code>close()</code> method:
</p>
<pre class="brush: java;">
// Stop the Kafka Streams instance
streams.close();
</pre>
Now it's time to execute your application that uses the Kafka Streams library, which can be run just like any other Java application - there is no special magic or requirement on the side of Kafka Streams.
For example, you can package your Java application as a fat jar file and then start the application via:
<pre class="brush: bash;">
# Start the application in class `com.example.MyStreamsApp`
# from the fat jar named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
</pre>
<p>
When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks that can be executed in parallel by the stream threads within the instance.
If the processor topology defines any state stores, these state stores will also be (re-)constructed, if possible, during the initialization
period of their associated stream tasks.
It is important to understand that, when starting your application as described above, you are actually launching what Kafka Streams considers to be one instance of your application.
More than one instance of your application may be running at a time, and in fact the common scenario is that there are indeed multiple instances of your application running in parallel (e.g., on another JVM or another machine).
In such cases, Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
See <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks"><b>Stream Partitions and Tasks</b></a> and <a href="/{{version}}/documentation/streams/architecture#streams_architecture_threads"><b>Threading Model</b></a> for details.
</p>
<div class="pagination">
<a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/core-concepts" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../includes/_header.htm" -->
<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation documentation--current">
<!--#include virtual="../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams API</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>