<li><aclass="reference internal"href="#applying-processors-and-transformers-processor-api-integration"id="id24">Applying processors and transformers (Processor API integration)</a></li>
</ul>
</li>
<li><aclass="reference internal"href="#writing-streams-back-to-kafka"id="id25">Writing streams back to Kafka</a></li>
<h2><aclass="toc-backref"href="#id7">Overview</a><aclass="headerlink"href="#overview"title="Permalink to this headline"></a></h2>
<p>In comparison to the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a>, only the DSL supports:</p>
<li>Built-in abstractions for <aclass="reference internal"href="../core-concepts.html#streams_concepts_duality"><spanclass="std std-ref">streams and tables</span></a> in the form of
<aclass="reference internal"href="#streams_concepts_kstream"><spanclass="std std-ref">KStream</span></a>, <aclass="reference internal"href="#streams_concepts_ktable"><spanclass="std std-ref">KTable</span></a>, and
<aclass="reference internal"href="#streams_concepts_globalktable"><spanclass="std std-ref">GlobalKTable</span></a>. Having first-class support for streams and tables is crucial
because, in practice, most use cases require not just either streams or databases/tables, but a combination of both.
For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your
application will be doing is transforming many input <em>streams</em> of customer-related events into an output <em>table</em>
that contains a continuously updated 360-degree view of your customers.</li>
<li>Declarative, functional programming style with
<aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">stateless transformations</span></a> (e.g. <codeclass="docutils literal"><spanclass="pre">map</span></code> and <codeclass="docutils literal"><spanclass="pre">filter</span></code>)
as well as <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateful"><spanclass="std std-ref">stateful transformations</span></a>
such as <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> (e.g. <codeclass="docutils literal"><spanclass="pre">count</span></code> and <codeclass="docutils literal"><spanclass="pre">reduce</span></code>),
<aclass="reference internal"href="#streams-developer-guide-dsl-joins"><spanclass="std std-ref">joins</span></a> (e.g. <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>), and
<p>With the DSL, you can define <aclass="reference internal"href="../core-concepts.html#streams_topology"><spanclass="std std-ref">processor topologies</span></a> (i.e., the logical
processing plan) in your application. The steps to accomplish this are:</p>
<olclass="arabic simple">
<li>Specify <aclass="reference internal"href="#streams-developer-guide-dsl-sources"><spanclass="std std-ref">one or more input streams that are read from Kafka topics</span></a>.</li>
<li>Compose <aclass="reference internal"href="#streams-developer-guide-dsl-transformations"><spanclass="std std-ref">transformations</span></a> on these streams.</li>
<li>Write the <aclass="reference internal"href="#streams-developer-guide-dsl-destinations"><spanclass="std std-ref">resulting output streams back to Kafka topics</span></a>, or expose the processing results of your application directly to other applications through <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> (e.g., via a REST API).</li>
</ol>
<p>After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into
action). A step-by-step guide for writing a stream processing application using the DSL is provided below.</p>
<p>For a complete list of available API functionality, see also the <ahref="../../../javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p>
Only the <strong>Kafka Streams DSL</strong> has the notion of a <code>KStream</code>.
</p>
<p>
A <strong>KStream</strong> is an abstraction of a <strong>record stream</strong>, where each data record represents a self-contained datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an "INSERT" -- think: adding more entries to an append-only ledger -- because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry.
</p>
<p>
To illustrate, let's imagine the following two data records are being sent to the stream:
If your stream processing application were to sum the values per user, it would return <code>4</code> for <code>alice</code>. Why? Because the second data record would not be considered an update of the previous record. Compare this behavior of KStream to <code>KTable</code> below,
which would return <code>3</code> for <code>alice</code>.
Only the <strong>Kafka Streams DSL</strong> has the notion of a <code>KTable</code>.
</p>
<p>
A <strong>KTable</strong> is an abstraction of a <strong>changelog stream</strong>, where each data record represents an update. More precisely, the value in a data record is interpreted as an "UPDATE" of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, <code>null</code> values are interpreted in a special way: a record with a <code>null</code> value represents a "DELETE" or tombstone for the record's key.
</p>
<p>
To illustrate, let's imagine the following two data records are being sent to the stream:
If your stream processing application were to sum the values per user, it would return <code>3</code> for <code>alice</code>. Why? Because the second data record would be considered an update of the previous record.
</p>
<p>
<strong>Effects of Kafka's log compaction:</strong> Another way of thinking about KStream and KTable is as follows: If you were to store a KTable into a Kafka topic, you'd probably want to enable Kafka's <ahref="http://kafka.apache.org/documentation.html#compaction">log compaction</a> feature, e.g. to save storage space.
</p>
<p>
However, it would not be safe to enable log compaction in the case of a KStream because, as soon as log compaction would begin purging older data records of the same key, it would break the semantics of the data. To pick up the illustration example again, you'd suddenly get a <code>3</code> for <code>alice</code> instead of a <code>4</code> because log compaction would have removed the <code>("alice", 1)</code> data record. Hence log compaction is perfectly safe for a KTable (changelog stream) but it is a mistake for a KStream (record stream).
</p>
<p>
We have already seen an example of a changelog stream in the section <strong>streams_concepts_duality</strong>. Another example are change data capture (CDC) records in the changelog of a relational database, representing which row in a database table was inserted, updated, or deleted.
</p>
<p>
KTable also provides an ability to look up <em>current</em> values of data records by keys. This table-lookup functionality is available through <strong>join operations</strong> (see also <strong>Joining</strong> in the Developer Guide) as well as through <strong>Interactive Queries</strong>.
<p>Only the <strong>Kafka Streams DSL</strong> has the notion of a <strong>GlobalKTable</strong>.</p>
<p>
Like a <strong>KTable</strong>, a <strong>GlobalKTable</strong> is an abstraction of a <strong>changelog stream</strong>, where each data record represents an update.
</p>
<p>
A GlobalKTable differs from a KTable in the data that they are being populated with, i.e. which data from the underlying Kafka topic is being read into the respective table. Slightly simplified, imagine you have an input topic with 5 partitions. In your application, you want to read this topic into a table. Also, you want to run your application across 5 application instances for <strong>maximum parallelism</strong>.
</p>
<ul>
<li>
If you read the input topic into a <strong>KTable</strong>, then the "local" KTable instance of each application instance will be populated with data <strong>from only 1 partition</strong> of the topic's 5 partitions.
</li>
<li>
If you read the input topic into a <strong>GlobalKTable</strong>, then the local GlobalKTable instance of each application instance will be populated with data <strong>from all partitions of the topic</strong>.
</li>
</ul>
<p>
GlobalKTable provides the ability to look up <em>current</em> values of data records by keys. This table-lookup functionality is available through <codeclass="interpreted-text">join operations</code>.
</p>
<p>Benefits of global tables:</p>
<ul>
<li>
More convenient and/or efficient <strong>joins</strong>: Notably, global tables allow you to perform star joins, they support "foreign-key" lookups (i.e., you can lookup data in the table not just by record key, but also by data in the record values), and they are more efficient when chaining multiple joins. Also, when joining against a global table, the input data does not need to be <strong>co-partitioned</strong>.
</li>
<li>
Can be used to "broadcast" information to all the running instances of your application.
</li>
</ul>
<p>Downsides of global tables:</p>
<ul>
<li>Increased local storage consumption compared to the (partitioned) KTable because the entire topic is tracked.</li>
<li>Increased network and Kafka broker load compared to the (partitioned) KTable because the entire topic is read.</li>
<spanid="streams-developer-guide-dsl-sources"></span><h2><aclass="toc-backref"href="#id8">Creating source streams from Kafka</a><aclass="headerlink"href="#creating-source-streams-from-kafka"title="Permalink to this headline"></a></h2>
<p>You can easily read data from Kafka topics into your application. The following operations are supported.</p>
<td><pclass="first">Creates a <aclass="reference internal"href="#streams_concepts_kstream"><spanclass="std std-ref">KStream</span></a> from the specified Kafka input topics and interprets the data
as a <aclass="reference internal"href="#streams_concepts_kstream"><spanclass="std std-ref">record stream</span></a>.
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()</span><spanclass="cm">/* value serde */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</p>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
SerDes, and implementing your own custom SerDes see <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a>.</p>
<pclass="last">Several variants of <codeclass="docutils literal"><spanclass="pre">stream</span></code> exist, for example to specify a regex pattern for input topics to read from).</p>
<td><pclass="first">Reads the specified Kafka input topic into a <aclass="reference internal"href="#streams_concepts_ktable"><spanclass="std std-ref">KTable</span></a>. The topic is
interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE
(when the record value is not <codeclass="docutils literal"><spanclass="pre">null</span></code>) or as DELETE (when the value is <codeclass="docutils literal"><spanclass="pre">null</span></code>) for that key.
<aclass="reference internal"href="../architecture.html#streams_architecture_state"><spanclass="std std-ref">state store</span></a> that backs the table). This is required for
supporting <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> against the table. When a
name is not provided the table will not queryable and an internal name will be provided for the state store.</p>
<p>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</p>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
SerDes, and implementing your own custom SerDes see <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a>.</p>
<pclass="last">Several variants of <codeclass="docutils literal"><spanclass="pre">table</span></code> exist, for example to specify the <codeclass="docutils literal"><spanclass="pre">auto.offset.reset</span></code> policy to be used when
<td><pclass="first">Reads the specified Kafka input topic into a <aclass="reference internal"href="#streams_concepts_globalktable"><spanclass="std std-ref">GlobalKTable</span></a>. The topic is
interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE
(when the record value is not <codeclass="docutils literal"><spanclass="pre">null</span></code>) or as DELETE (when the value is <codeclass="docutils literal"><spanclass="pre">null</span></code>) for that key.
<aclass="reference internal"href="../architecture.html#streams_architecture_state"><spanclass="std std-ref">state store</span></a> that backs the table). This is required for
supporting <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> against the table. When a
name is not provided the table will not queryable and an internal name will be provided for the state store.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* value serde */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
SerDes, and implementing your own custom SerDes see <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a>.</p>
<pclass="last">Several variants of <codeclass="docutils literal"><spanclass="pre">globalTable</span></code> exist to e.g. specify explicit SerDes.</p>
</td>
</tr>
</tbody>
</table>
</div>
<divclass="section"id="transform-a-stream">
<spanid="streams-developer-guide-dsl-transformations"></span><h2><aclass="toc-backref"href="#id9">Transform a stream</a><aclass="headerlink"href="#transform-a-stream"title="Permalink to this headline"></a></h2>
<p>The KStream and KTable interfaces support a variety of transformation operations.
Each of these operations can be translated into one or more connected processors into the underlying processor topology.
Since KStream and KTable are strongly typed, all of these transformation operations are defined as
generic functions where users could specify the input and output data types.</p>
<p>Some KStream transformations may generate one or more KStream objects, for example:
- <codeclass="docutils literal"><spanclass="pre">filter</span></code> and <codeclass="docutils literal"><spanclass="pre">map</span></code> on a KStream will generate another KStream
- <codeclass="docutils literal"><spanclass="pre">branch</span></code> on KStream can generate multiple KStreams</p>
<p>Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of <aclass="reference internal"href="../core-concepts.html#streams_concepts_aggregations"><spanclass="std std-ref">late records</span></a> after it
<spanid="streams-developer-guide-dsl-transformations-stateless"></span><h3><aclass="toc-backref"href="#id10">Stateless transformations</a><aclass="headerlink"href="#stateless-transformations"title="Permalink to this headline"></a></h3>
<p>Stateless transformations do not require state for processing and they do not require a state store associated with
the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless <codeclass="docutils literal"><spanclass="pre">KTable</span></code> transformation. This allows the result to be queried through <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a>. To materialize a <codeclass="docutils literal"><spanclass="pre">KTable</span></code>, each of the below stateless operations <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries-local-key-value-stores"><spanclass="std std-ref">can be augmented</span></a> with an optional <codeclass="docutils literal"><spanclass="pre">queryableStoreName</span></code> argument.</p>
<td><pclass="first">Branch (or split) a <codeclass="docutils literal"><spanclass="pre">KStream</span></code> based on the supplied predicates into one or more <codeclass="docutils literal"><spanclass="pre">KStream</span></code> instances.
<spanclass="o">(</span><spanclass="n">key</span><spanclass="o">,</span><spanclass="n">value</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">key</span><spanclass="o">.</span><spanclass="na">startsWith</span><spanclass="o">(</span><spanclass="s">"A"</span><spanclass="o">),</span><spanclass="cm">/* first predicate */</span>
<spanclass="o">(</span><spanclass="n">key</span><spanclass="o">,</span><spanclass="n">value</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">key</span><spanclass="o">.</span><spanclass="na">startsWith</span><spanclass="o">(</span><spanclass="s">"B"</span><spanclass="o">),</span><spanclass="cm">/* second predicate */</span>
<spanclass="o">(</span><spanclass="n">key</span><spanclass="o">,</span><spanclass="n">value</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="kc">true</span><spanclass="cm">/* third predicate */</span>
<spanclass="o">);</span>
<spanclass="c1">// KStream branches[0] contains all records whose keys start with "A"</span>
<spanclass="c1">// KStream branches[1] contains all records whose keys start with "B"</span>
<spanclass="c1">// KStream branches[2] contains all other records</span>
<spanclass="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span>
<p><strong>Marks the stream for data re-partitioning:</strong>
Applying a grouping or a join after <codeclass="docutils literal"><spanclass="pre">flatMap</span></code> will result in re-partitioning of the records.
If possible use <codeclass="docutils literal"><spanclass="pre">flatMapValues</span></code> instead, which will not cause data re-partitioning.</p>
<p><codeclass="docutils literal"><spanclass="pre">flatMapValues</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">flatMap</span></code> because it will not cause data re-partitioning. However, you
cannot modify the key or key type like <codeclass="docutils literal"><spanclass="pre">flatMap</span></code> does.</p>
<divclass="last highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Split a sentence into words.</span>
<p>You would use <codeclass="docutils literal"><spanclass="pre">foreach</span></code> to cause <em>side effects</em> based on the input data (similar to <codeclass="docutils literal"><spanclass="pre">peek</span></code>) and then <em>stop</em>
<em>further processing</em> of the input data (unlike <codeclass="docutils literal"><spanclass="pre">peek</span></code>, which is not a terminal operation).</p>
<p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p>
<p>Grouping is a prerequisite for <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregating a stream or a table</span></a>
and ensures that data is properly partitioned (“keyed”) for subsequent operations.</p>
<p><strong>When to set explicit SerDes:</strong>
Variants of <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> exist to override the configured default SerDes of your application, which <strong>you</strong>
<strong>must do</strong> if the key and/or value types of the resulting <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code> do not match the configured default
<pclass="last"><strong>Grouping vs. Windowing:</strong>
A related operation is <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowing</span></a>, which lets you control how to
“sub-group” the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as
windowed <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> or
<p><strong>Causes data re-partitioning if and only if the stream was marked for re-partitioning.</strong>
<codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">groupBy</span></code> because it re-partitions data only if the stream was already marked
for re-partitioning. However, <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> does not allow you to modify the key or key type like <codeclass="docutils literal"><spanclass="pre">groupBy</span></code>
<td><pclass="first">Groups the records by a <em>new</em> key, which may be of a different key type.
When grouping a table, you may also specify a new value and value type.
<codeclass="docutils literal"><spanclass="pre">groupBy</span></code> is a shorthand for <codeclass="docutils literal"><spanclass="pre">selectKey(...).groupByKey()</span></code>.
<p>Grouping is a prerequisite for <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregating a stream or a table</span></a>
and ensures that data is properly partitioned (“keyed”) for subsequent operations.</p>
<p><strong>When to set explicit SerDes:</strong>
Variants of <codeclass="docutils literal"><spanclass="pre">groupBy</span></code> exist to override the configured default SerDes of your application, which <strong>you must</strong>
<strong>do</strong> if the key and/or value types of the resulting <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code> or <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code> do not match the
<pclass="last"><strong>Grouping vs. Windowing:</strong>
A related operation is <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowing</span></a>, which lets you control how to
“sub-group” the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as
windowed <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> or
<p><strong>Always causes data re-partitioning:</strong><codeclass="docutils literal"><spanclass="pre">groupBy</span></code> always causes data re-partitioning.
If possible use <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> instead, which will re-partition data only if required.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">())</span><spanclass="cm">/* value */</span>
<spanclass="o">);</span>
<spanclass="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">())</span><spanclass="cm">/* value (note: type was modified) */</span>
<spanclass="o">);</span>
<spanclass="c1">// Java 7 examples</span>
<spanclass="c1">// Group the stream by a new key and key type</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">())</span><spanclass="cm">/* value */</span>
<spanclass="o">);</span>
<spanclass="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">())</span><spanclass="cm">/* value (note: type was modified) */</span>
<p><codeclass="docutils literal"><spanclass="pre">mapValues</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">map</span></code> because it will not cause data re-partitioning. However, it does not
allow you to modify the key or key type like <codeclass="docutils literal"><spanclass="pre">map</span></code> does.</p>
from different streams in the merged stream. Relative order is preserved within each input stream though (ie, records within the same input stream are processed in order)</p>
<p>You would use <codeclass="docutils literal"><spanclass="pre">peek</span></code> to cause <em>side effects</em> based on the input data (similar to <codeclass="docutils literal"><spanclass="pre">foreach</span></code>) and <em>continue</em>
<em>processing</em> the input data (unlike <codeclass="docutils literal"><spanclass="pre">foreach</span></code>, which is a terminal operation). <codeclass="docutils literal"><spanclass="pre">peek</span></code> returns the input
stream as-is; if you need to modify the input stream, use <codeclass="docutils literal"><spanclass="pre">map</span></code> or <codeclass="docutils literal"><spanclass="pre">mapValues</span></code> instead.</p>
<p><codeclass="docutils literal"><spanclass="pre">peek</span></code> is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.</p>
<p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p>
<td><pclass="first"><strong>Terminal operation.</strong> Prints the records to <codeclass="docutils literal"><spanclass="pre">System.out</span></code>. See Javadocs for serde and <codeclass="docutils literal"><spanclass="pre">toString()</span></code>
<p>Calling <codeclass="docutils literal"><spanclass="pre">print()</span></code> is the same as calling <codeclass="docutils literal"><spanclass="pre">foreach((key,</span><spanclass="pre">value)</span><spanclass="pre">-></span><spanclass="pre">System.out.println(key</span><spanclass="pre">+</span><spanclass="pre">",</span><spanclass="pre">"</span><spanclass="pre">+</span><spanclass="pre">value))</span></code></p>
<p><codeclass="docutils literal"><spanclass="pre">print</span></code> is mainly for debugging/testing purposes, and it will try to flush on each record print. Hence it <strong>should not</strong> be used for production usage if performance requirements are concerned.</p>
<p>Calling <codeclass="docutils literal"><spanclass="pre">selectKey(mapper)</span></code> is the same as calling <codeclass="docutils literal"><spanclass="pre">map((key,</span><spanclass="pre">value)</span><spanclass="pre">-></span><spanclass="pre">mapper(key,</span><spanclass="pre">value),</span><spanclass="pre">value)</span></code>.</p>
<p><strong>Marks the stream for data re-partitioning:</strong>
Applying a grouping or a join after <codeclass="docutils literal"><spanclass="pre">selectKey</span></code> will result in re-partitioning of the records.</p>
<spanid="streams-developer-guide-dsl-transformations-stateful"></span><h3><aclass="toc-backref"href="#id11">Stateful transformations</a><aclass="headerlink"href="#stateful-transformations"title="Permalink to this headline"></a></h3>
<pid="streams-developer-guide-dsl-transformations-stateful-overview">Stateful transformations depend on state for processing inputs and producing outputs and require a <aclass="reference internal"href="../architecture.html#streams_architecture_state"><spanclass="std std-ref">state store</span></a> associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per
See <aclass="reference internal"href="../architecture.html#streams_architecture_recovery"><spanclass="std std-ref">Fault Tolerance</span></a> for further information.</p>
<li><aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">Windowing</span></a> (as part of aggregations and joins)</li>
<li><aclass="reference internal"href="#streams-developer-guide-dsl-process"><spanclass="std std-ref">Applying custom processors and transformers</span></a>, which may be stateful, for
Processor API integration</li>
</ul>
<p>The following diagram shows their relationships:</p>
<pclass="caption"><spanclass="caption-text">Stateful transformations in the DSL.</span></p>
</div>
<p>Here is an example of a stateful application: the WordCount algorithm.</p>
<p>WordCount example in Java 8+, using lambda expressions:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Assume the record values represent lines of text. For the sake of this example, you can ignore</span>
<spanclass="c1">// whatever may be stored in the record keys.</span>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Code below is equivalent to the previous Java 8+ example above.</span>
<spanid="streams-developer-guide-dsl-aggregating"></span><h4><aclass="toc-backref"href="#id12">Aggregating</a><aclass="headerlink"href="#aggregating"title="Permalink to this headline"></a></h4>
<p>After records are <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">grouped</span></a> by key via <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> or
<codeclass="docutils literal"><spanclass="pre">groupBy</span></code>– and thus represented as either a <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code> or a <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>, they can be aggregated
via an operation such as <codeclass="docutils literal"><spanclass="pre">reduce</span></code>. Aggregations are key-based operations, which means that they always operate over records
(notably record values) of the same key.
You can perform aggregations on <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowed</span></a> or non-windowed data.</p>
<td><pclass="first"><strong>Rolling aggregation.</strong> Aggregates the values of (non-windowed) records by the grouped key.
Aggregating is a generalization of <codeclass="docutils literal"><spanclass="pre">reduce</span></code> and allows, for example, the aggregate value to have a different
<p>When aggregating a <em>grouped stream</em>, you must provide an initializer (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">=</span><spanclass="pre">0</span></code>) and an “adder”
aggregator (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">+</span><spanclass="pre">curValue</span></code>). When aggregating a <em>grouped table</em>, you must provide a
<spanclass="n">Materialized</span><spanclass="o">.</span><spanclass="na">as</span><spanclass="o">(</span><spanclass="s">"aggregated-stream-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Aggregating a KGroupedTable (note how the value type changes from String to Long)</span>
<spanclass="n">Materialized</span><spanclass="o">.</span><spanclass="na">as</span><spanclass="o">(</span><spanclass="s">"aggregated-table-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Java 7 examples</span>
<spanclass="c1">// Aggregating a KGroupedStream (note how the value type changes from String to Long)</span>
<p>Detailed behavior of <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored.</li>
<li>When a record key is received for the first time, the initializer is called (and called before the adder).</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received, the adder is called.</li>
</ul>
<p>Detailed behavior of <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored.</li>
<li>When a record key is received for the first time, the initializer is called (and called before the adder
and subtractor). Note that, in contrast to <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>, over time the initializer may be called
more than once for a key as a result of having received input tombstone records for that key (see below).</li>
<li>When the first non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<codeclass="docutils literal"><spanclass="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The order of execution for the subtractor and adder is not defined.</li>
<li>When a tombstone record – i.e. a record with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value – is received for a key (e.g., DELETE),
then only the subtractor is called. Note that, whenever the subtractor returns a <codeclass="docutils literal"><spanclass="pre">null</span></code> value itself,
then the corresponding key is removed from the resulting <codeclass="docutils literal"><spanclass="pre">KTable</span></code>. If that happens, any next input
record for that key will trigger the initializer again.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
Aggregates the values of records, <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">per window</span></a>, by the grouped key.
Aggregating is a generalization of <codeclass="docutils literal"><spanclass="pre">reduce</span></code> and allows, for example, the aggregate value to have a different
<p>You must provide an initializer (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">=</span><spanclass="pre">0</span></code>), “adder” aggregator (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">+</span><spanclass="pre">curValue</span></code>),
and a window. When windowing based on sessions, you must additionally provide a “session merger” aggregator
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">WindowStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]>></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"time-windowed-aggregated-stream-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()));</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">SessionStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]>></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"sessionized-aggregated-stream-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()));</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Java 7 examples</span>
<spanclass="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<li>The windowed aggregate behaves similar to the rolling aggregate described above. The additional twist is that
the behavior applies <em>per window</em>.</li>
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, the initializer is called (and called
before the adder).</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a given window, the adder is called.</li>
<li>When using session windows: the session merger is called whenever two sessions are being merged.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys or values are ignored.</li>
</ul>
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>:</p>
<ulclass="last simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored. Records with <codeclass="docutils literal"><spanclass="pre">null</span></code> values are not ignored but interpreted
as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.</li>
Counts the number of records, <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">per window</span></a>, by the grouped key.
<p>When reducing a <em>grouped stream</em>, you must provide an “adder” reducer (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">+</span><spanclass="pre">curValue</span></code>).
When reducing a <em>grouped table</em>, you must additionally provide a “subtractor” reducer (e.g.,
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that record is used as the initial
aggregate value.</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received, the adder is called.</li>
</ul>
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that record is used as the initial
aggregate value.
Note that, in contrast to <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>, over time this initialization step may happen more than once
for a key as a result of having received input tombstone records for that key (see below).</li>
<li>When the first non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<codeclass="docutils literal"><spanclass="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The order of execution for the subtractor and adder is not defined.</li>
<li>When a tombstone record – i.e. a record with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value – is received for a key (e.g., DELETE),
then only the subtractor is called. Note that, whenever the subtractor returns a <codeclass="docutils literal"><spanclass="pre">null</span></code> value itself,
then the corresponding key is removed from the resulting <codeclass="docutils literal"><spanclass="pre">KTable</span></code>. If that happens, any next input
record for that key will re-initialize its aggregate value.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
Combines the values of records, <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">per window</span></a>, by the grouped key.
The current record value is combined with the last reduced value, and a new reduced value is returned.
Records with <codeclass="docutils literal"><spanclass="pre">null</span></code> key or value are ignored.
The result value type cannot be changed, unlike <codeclass="docutils literal"><spanclass="pre">aggregate</span></code>.
<p>The windowed <codeclass="docutils literal"><spanclass="pre">reduce</span></code> turns a turns a <codeclass="docutils literal"><spanclass="pre">TimeWindowedKStream<K,</span><spanclass="pre">V></span></code> or a <codeclass="docutils literal"><spanclass="pre">SessionWindowedKStream<K,</span><spanclass="pre">V></span></code>
into a windowed <codeclass="docutils literal"><spanclass="pre">KTable<Windowed<K>,</span><spanclass="pre">V></span></code>.</p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">reduce</span></code> exist, see Javadocs for details.</p>
<li>The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that the
behavior applies <em>per window</em>.</li>
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, then the value of that record is used as
the initial aggregate value.</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a given window, the adder is called.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Example of semantics for stream aggregations:</strong>
A <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>→<codeclass="docutils literal"><spanclass="pre">KTable</span></code> example is shown below. The streams and the table are initially empty. Bold
font is used in the column for “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” to highlight changed state. An entry such as <codeclass="docutils literal"><spanclass="pre">(hello,</span><spanclass="pre">1)</span></code> denotes a
record with key <codeclass="docutils literal"><spanclass="pre">hello</span></code> and value <codeclass="docutils literal"><spanclass="pre">1</span></code>. To improve the readability of the semantics table you can assume that all records
are processed in timestamp order.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Key: word, value: count</span>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">KeyValueStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"aggregated-stream-store"</span><spanclass="cm">/* state store name */</span><spanclass="o">)</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
<pclass="last"><strong>Impact of record caches</strong>:
For illustration purposes, the column “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” below shows the table’s state changes over time in a
very granular way. In practice, you would observe state changes in such a granular way only when
<aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">record caches</span></a> are disabled (default: enabled).
When record caches are enabled, what might happen for example is that the output results of the rows with timestamps
4 and 5 would be <aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">compacted</span></a>, and there would only be
a single state update for the key <codeclass="docutils literal"><spanclass="pre">kafka</span></code> in the KTable (here: from <codeclass="docutils literal"><spanclass="pre">(kafka</span><spanclass="pre">1)</span></code> directly to <codeclass="docutils literal"><spanclass="pre">(kafka,</span><spanclass="pre">3)</span></code>.
Typically, you should only disable record caches for testing or debugging purposes – under normal circumstances it
<p><strong>Example of semantics for table aggregations:</strong>
A <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>→<codeclass="docutils literal"><spanclass="pre">KTable</span></code> example is shown below. The tables are initially empty. Bold font is used in the column
for “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” to highlight changed state. An entry such as <codeclass="docutils literal"><spanclass="pre">(hello,</span><spanclass="pre">1)</span></code> denotes a record with key
<codeclass="docutils literal"><spanclass="pre">hello</span></code> and value <codeclass="docutils literal"><spanclass="pre">1</span></code>. To improve the readability of the semantics table you can assume that all records are processed
in timestamp order.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")</span>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">KeyValueStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"aggregated-table-store"</span><spanclass="cm">/* state store name */</span><spanclass="o">)</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
<pclass="last"><strong>Impact of record caches</strong>:
For illustration purposes, the column “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” below shows the table’s state changes over time in a
very granular way. In practice, you would observe state changes in such a granular way only when
<aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">record caches</span></a> are disabled (default: enabled).
When record caches are enabled, what might happen for example is that the output results of the rows with timestamps
4 and 5 would be <aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">compacted</span></a>, and there would only be
a single state update for the key <codeclass="docutils literal"><spanclass="pre">kafka</span></code> in the KTable (here: from <codeclass="docutils literal"><spanclass="pre">(kafka</span><spanclass="pre">1)</span></code> directly to <codeclass="docutils literal"><spanclass="pre">(kafka,</span><spanclass="pre">3)</span></code>.
Typically, you should only disable record caches for testing or debugging purposes – under normal circumstances it
<spanid="streams-developer-guide-dsl-joins"></span><h4><aclass="toc-backref"href="#id13">Joining</a><aclass="headerlink"href="#joining"title="Permalink to this headline"></a></h4>
<pid="streams-developer-guide-dsl-joins-overview">Streams and tables can also be joined. Many stream processing applications in practice are coded as streaming joins.
For example, applications backing an online shop might need to access multiple, updating database tables (e.g. sales
prices, inventory, customer information) in order to enrich a new data record (e.g. customer transaction) with context
information. That is, scenarios where you need to perform table lookups at very large scale and with a low processing
latency. Here, a popular pattern is to make the information in the databases available in Kafka through so-called
<em>change data capture</em> in combination with <aclass="reference internal"href="../../#connect"><spanclass="std std-ref">Kafka’s Connect API</span></a>, and then implementing
applications that leverage the Streams API to perform very fast and efficient local joins
of such tables and streams, rather than requiring the application to make a query to a remote database over the network
for each record. In this example, the KTable concept in Kafka Streams would enable you to track the latest state
(e.g., snapshot) of each table in a local state store, thus greatly reducing the processing latency as well as
reducing the load of the remote databases when doing such streaming joins.</p>
<p>The following join operations are supported, see also the diagram in the
<aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateful-overview"><spanclass="std std-ref">overview section</span></a> of
Depending on the operands, joins are either <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowed</span></a> joins or
<spanid="streams-developer-guide-dsl-joins-co-partitioning"></span><h5><aclass="toc-backref"href="#id14">Join co-partitioning requirements</a><aclass="headerlink"href="#join-co-partitioning-requirements"title="Permalink to this headline"></a></h5>
<p>Input data must be co-partitioned when joining. This ensures that input records with the same key, from both sides of the
join, are delivered to the same stream task during processing.
<strong>It is the responsibility of the user to ensure data co-partitioning when joining</strong>.</p>
<pclass="last">If possible, consider using <aclass="reference internal"href="#streams_concepts_globalktable"><spanclass="std std-ref">global tables</span></a> (<codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code>) for joining because they do not require data co-partitioning.</p>
This means that, for example, applications that use Kafka’s <aclass="reference internal"href="../../#producerapi"><spanclass="std std-ref">Java Producer API</span></a> must use the
same partitioner (cf. the producer setting <codeclass="docutils literal"><spanclass="pre">"partitioner.class"</span></code> aka <codeclass="docutils literal"><spanclass="pre">ProducerConfig.PARTITIONER_CLASS_CONFIG</span></code>),
and applications that use the Kafka’s Streams API must use the same <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> for operations such as
<codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code>. The good news is that, if you happen to use the default partitioner-related settings across all
applications, you do not need to worry about the partitioning strategy.</li>
are performed based on the keys of records (e.g., <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>), it is required that the
input streams/tables of a join are co-partitioned by key.</p>
<p>The only exception are
<aclass="reference internal"href="#streams-developer-guide-dsl-joins-kstream-globalktable"><spanclass="std std-ref">KStream-GlobalKTable joins</span></a>. Here, co-partitioning is
it not required because <em>all</em> partitions of the <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code>‘s underlying changelog stream are made available to
each <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, i.e. each instance has a full copy of the changelog stream. Further, a
<codeclass="docutils literal"><spanclass="pre">KeyValueMapper</span></code> allows for non-key based joins from the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> to the <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code>.</p>
<pclass="last"><strong>Kafka Streams partly verifies the co-partitioning requirement:</strong>
During the partition assignment step, i.e. at runtime, Kafka Streams verifies whether the number of partitions for
both sides of a join are the same. If they are not, a <codeclass="docutils literal"><spanclass="pre">TopologyBuilderException</span></code> (runtime exception) is being
thrown. Note that Kafka Streams cannot verify whether the partitioning strategy matches between the input
streams/tables of a join – it is up to the user to ensure that this is the case.</p>
</div>
<p><strong>Ensuring data co-partitioning:</strong> If the inputs of a join are not co-partitioned yet, you must ensure this manually.
You may follow a procedure such as outlined below.</p>
<olclass="arabic">
<li><pclass="first">Identify the input KStream/KTable in the join whose underlying Kafka topic has the smaller number of partitions.
Let’s call this stream/table “SMALLER”, and the other side of the join “LARGER”. To learn about the number of
partitions of a Kafka topic you can use, for example, the CLI tool <codeclass="docutils literal"><spanclass="pre">bin/kafka-topics</span></code> with the <codeclass="docutils literal"><spanclass="pre">--describe</span></code>
option.</p>
</li>
<li><pclass="first">Pre-create a new Kafka topic for “SMALLER” that has the same number of partitions as “LARGER”. Let’s call this
new topic “repartitioned-topic-for-smaller”. Typically, you’d use the CLI tool <codeclass="docutils literal"><spanclass="pre">bin/kafka-topics</span></code> with the
<codeclass="docutils literal"><spanclass="pre">--create</span></code> option for this.</p>
</li>
<li><pclass="first">Within your application, re-write the data of “SMALLER” into the new Kafka topic. You must ensure that, when writing
the data with <codeclass="docutils literal"><spanclass="pre">to</span></code> or <codeclass="docutils literal"><spanclass="pre">through</span></code>, the same partitioner is used as for “LARGER”.</p>
<blockquote>
<div><ulclass="simple">
<li>If “SMALLER” is a KStream: <codeclass="docutils literal"><spanclass="pre">KStream#to("repartitioned-topic-for-smaller")</span></code>.</li>
<li>If “SMALLER” is a KTable: <codeclass="docutils literal"><spanclass="pre">KTable#to("repartitioned-topic-for-smaller")</span></code>.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">Within your application, re-read the data in “repartitioned-topic-for-smaller” into
a new KStream/KTable.</p>
<blockquote>
<div><ulclass="simple">
<li>If “SMALLER” is a KStream: <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream("repartitioned-topic-for-smaller")</span></code>.</li>
<li>If “SMALLER” is a KTable: <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#table("repartitioned-topic-for-smaller")</span></code>.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">Within your application, perform the join between “LARGER” and the new stream/table.</p>
</li>
</ol>
</div>
<divclass="section"id="kstream-kstream-join">
<spanid="streams-developer-guide-dsl-joins-kstream-kstream"></span><h5><aclass="toc-backref"href="#id15">KStream-KStream Join</a><aclass="headerlink"href="#kstream-kstream-join"title="Permalink to this headline"></a></h5>
<p>KStream-KStream joins are always <aclass="reference internal"href="#windowing-sliding"><spanclass="std std-ref">windowed</span></a> joins, because otherwise the size of the
internal state store used to perform the join – e.g., a <aclass="reference internal"href="#windowing-sliding"><spanclass="std std-ref">sliding window</span></a> or “buffer”– would
grow indefinitely. For stream-stream joins it’s important to highlight that a new input record on one side will
produce a join output <em>for each</em> matching record on the other side, and there can be <em>multiple</em> such matching records
in a given join window (cf. the row with timestamp 15 in the join semantics table below, for example).</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<td><pclass="first">Performs an INNER JOIN of this stream with another stream.
Even though this operation is windowed, the joined stream will be of type <codeclass="docutils literal"><spanclass="pre">KStream<K,</span><spanclass="pre">...></span></code> rather than <codeclass="docutils literal"><spanclass="pre">KStream<Windowed<K>,</span><spanclass="pre">...></span></code>.
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">join</span></code> exists, see the Javadocs for details.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">(),</span><spanclass="cm">/* left value */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Double</span><spanclass="o">())</span><spanclass="cm">/* right value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
timestamps are “close” to each other as defined by the user-supplied <codeclass="docutils literal"><spanclass="pre">JoinWindows</span></code>, i.e. the window defines an additional join predicate over the record timestamps.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<td><pclass="first">Performs a LEFT JOIN of this stream with another stream.
Even though this operation is windowed, the joined stream will be of type <codeclass="docutils literal"><spanclass="pre">KStream<K,</span><spanclass="pre">...></span></code> rather than <codeclass="docutils literal"><spanclass="pre">KStream<Windowed<K>,</span><spanclass="pre">...></span></code>.
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code> exists, see the Javadocs for details.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">(),</span><spanclass="cm">/* left value */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Double</span><spanclass="o">())</span><spanclass="cm">/* right value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
timestamps are “close” to each other as defined by the user-supplied <codeclass="docutils literal"><spanclass="pre">JoinWindows</span></code>, i.e. the window defines an additional join predicate over the record timestamps.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>;
this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the LEFT JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<td><pclass="first">Performs an OUTER JOIN of this stream with another stream.
Even though this operation is windowed, the joined stream will be of type <codeclass="docutils literal"><spanclass="pre">KStream<K,</span><spanclass="pre">...></span></code> rather than <codeclass="docutils literal"><spanclass="pre">KStream<Windowed<K>,</span><spanclass="pre">...></span></code>.
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">outerJoin</span></code> exists, see the Javadocs for details.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">(),</span><spanclass="cm">/* left value */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Double</span><spanclass="o">())</span><spanclass="cm">/* right value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
timestamps are “close” to each other as defined by the user-supplied <codeclass="docutils literal"><spanclass="pre">JoinWindows</span></code>, i.e. the window defines an additional join predicate over the record timestamps.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on one side that does not have any match on the other side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code> or
<codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(null,</span><spanclass="pre">rightRecord.value)</span></code>, respectively; this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the OUTER JOIN column
(unlike LEFT JOIN, <codeclass="docutils literal"><spanclass="pre">[null,</span><spanclass="pre">x]</span></code> is possible, too, but no such example is shown in the table).</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of stream-stream joins:</strong>
The semantics of the various stream-stream join variants are explained below.
To improve the readability of the table, assume that (1) all records have the same key (and thus the key in the table is omitted), (2) all records belong to a single join window, and (3) all records are processed in timestamp order.
The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied
<aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html">ValueJoiner</a> for the <codeclass="docutils literal"><spanclass="pre">join</span></code>, <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>, and
<codeclass="docutils literal"><spanclass="pre">outerJoin</span></code> methods, respectively, whenever a new input record is received on either side of the join. An empty table
cell denotes that the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> is not called at all.</p>
<tableborder="1"class="docutils">
<theadvalign="bottom">
<trclass="row-odd"><thclass="head">Timestamp</th>
<thclass="head">Left (KStream)</th>
<thclass="head">Right (KStream)</th>
<thclass="head">(INNER) JOIN</th>
<thclass="head">LEFT JOIN</th>
<thclass="head">OUTER JOIN</th>
</tr>
</thead>
<tbodyvalign="top">
<trclass="row-even"><td>1</td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>2</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>3</td>
<td>A</td>
<td> </td>
<td> </td>
<td>[A, null]</td>
<td>[A, null]</td>
</tr>
<trclass="row-odd"><td>4</td>
<td> </td>
<td>a</td>
<td>[A, a]</td>
<td>[A, a]</td>
<td>[A, a]</td>
</tr>
<trclass="row-even"><td>5</td>
<td>B</td>
<td> </td>
<td>[B, a]</td>
<td>[B, a]</td>
<td>[B, a]</td>
</tr>
<trclass="row-odd"><td>6</td>
<td> </td>
<td>b</td>
<td>[A, b], [B, b]</td>
<td>[A, b], [B, b]</td>
<td>[A, b], [B, b]</td>
</tr>
<trclass="row-even"><td>7</td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>8</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>9</td>
<td>C</td>
<td> </td>
<td>[C, a], [C, b]</td>
<td>[C, a], [C, b]</td>
<td>[C, a], [C, b]</td>
</tr>
<trclass="row-odd"><td>10</td>
<td> </td>
<td>c</td>
<td>[A, c], [B, c], [C, c]</td>
<td>[A, c], [B, c], [C, c]</td>
<td>[A, c], [B, c], [C, c]</td>
</tr>
<trclass="row-even"><td>11</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>12</td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>13</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>14</td>
<td> </td>
<td>d</td>
<td>[A, d], [B, d], [C, d]</td>
<td>[A, d], [B, d], [C, d]</td>
<td>[A, d], [B, d], [C, d]</td>
</tr>
<trclass="row-even"><td>15</td>
<td>D</td>
<td> </td>
<td>[D, a], [D, b], [D, c], [D, d]</td>
<td>[D, a], [D, b], [D, c], [D, d]</td>
<td>[D, a], [D, b], [D, c], [D, d]</td>
</tr>
</tbody>
</table>
</div>
<divclass="section"id="ktable-ktable-join">
<spanid="streams-developer-guide-dsl-joins-ktable-ktable"></span><h5><aclass="toc-backref"href="#id16">KTable-KTable Join</a><aclass="headerlink"href="#ktable-ktable-join"title="Permalink to this headline"></a></h5>
<p>KTable-KTable joins are always <em>non-windowed</em> joins. They are designed to be consistent with their counterparts in
relational databases. The changelog streams of both KTables are materialized into local state stores to represent the
The join result is a new KTable that represents the changelog stream of the join operation.</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key are ignored and do not trigger the join.</li>
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not
trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding
key actually exists already in the join result KTable).</li>
</ul>
</div></blockquote>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key are ignored and do not trigger the join.</li>
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not
trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding
key actually exists already in the join result KTable).</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>;
this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the LEFT JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key are ignored and do not trigger the join.</li>
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not
trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding
key actually exists already in the join result KTable).</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on one side that does not have any match on the other side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code> or
<codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(null,</span><spanclass="pre">rightRecord.value)</span></code>, respectively; this explains the rows with timestamp=3 and timestamp=7 in the table below, which list <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> and
<codeclass="docutils literal"><spanclass="pre">[null,</span><spanclass="pre">b]</span></code>, respectively, in the OUTER JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of table-table joins:</strong>
The semantics of the various table-table join variants are explained below.
To improve the readability of the table, you can assume that (1) all records have the same key (and thus the key in the table is omitted) and that (2) all records are processed in timestamp order.
The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied
<aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html">ValueJoiner</a> for the <codeclass="docutils literal"><spanclass="pre">join</span></code>, <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>, and
<codeclass="docutils literal"><spanclass="pre">outerJoin</span></code> methods, respectively, whenever a new input record is received on either side of the join. An empty table
cell denotes that the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> is not called at all.</p>
<spanid="streams-developer-guide-dsl-joins-kstream-ktable"></span><h5><aclass="toc-backref"href="#id17">KStream-KTable Join</a><aclass="headerlink"href="#kstream-ktable-join"title="Permalink to this headline"></a></h5>
<p>KStream-KTable joins are always <em>non-windowed</em> joins. They allow you to perform <em>table lookups</em> against a KTable
(changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be to enrich
a stream of user activities (KStream) with the latest user profile information (KTable).</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">join</span></code> exists, see the Javadocs for details.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table.
Tombstones do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code> exists, see the Javadocs for details.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table.
Tombstones do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>;
this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the LEFT JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of stream-table joins:</strong>
The semantics of the various stream-table join variants are explained below.
To improve the readability of the table we assume that (1) all records have the same key (and thus we omit the key in
the table) and that (2) all records are processed in timestamp order.
The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied
<aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html">ValueJoiner</a> for the <codeclass="docutils literal"><spanclass="pre">join</span></code> and <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>
<spanid="streams-developer-guide-dsl-joins-kstream-globalktable"></span><h5><aclass="toc-backref"href="#id18">KStream-GlobalKTable Join</a><aclass="headerlink"href="#kstream-globalktable-join"title="Permalink to this headline"></a></h5>
<p>KStream-GlobalKTable joins are always <em>non-windowed</em> joins. They allow you to perform <em>table lookups</em> against a
<aclass="reference internal"href="#streams_concepts_globalktable"><spanclass="std std-ref">GlobalKTable</span></a> (entire changelog stream) upon receiving a new record from the
KStream (record stream). An example use case would be “star queries” or “star joins”, where you would enrich a stream
of user activities (KStream) with the latest user profile information (GlobalKTable) and further context information
(further GlobalKTables).</p>
<p>At a high-level, KStream-GlobalKTable joins are very similar to
<aclass="reference internal"href="#streams-developer-guide-dsl-joins-kstream-ktable"><spanclass="std std-ref">KStream-KTable joins</span></a>. However, global tables provide you
with much more flexibility at the <aclass="reference internal"href="#streams_concepts_globalktable"><spanclass="std std-ref">some expense</span></a> when compared to partitioned
<li>They do not require <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">data co-partitioning</span></a>.</li>
<li>They allow for efficient “star joins”; i.e., joining a large-scale “facts” stream against “dimension” tables</li>
<li>They allow for joining against foreign keys; i.e., you can lookup data in the table not just by the keys of records in the
stream, but also by data in the record values.</li>
<li>They make many use cases feasible where you must work on heavily skewed data and thus suffer from hot partitions.</li>
<li>They are often more efficient than their partitioned KTable counterpart when you need to perform multiple joins in
succession.</li>
</ul>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<p>The <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code> is fully bootstrapped upon (re)start of a <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, which means the table is fully populated with all the data in the underlying topic that is
available at the time of the startup. The actual data processing begins only once the bootstrapping has completed.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<spanclass="o">(</span><spanclass="n">leftKey</span><spanclass="o">,</span><spanclass="n">leftValue</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">leftKey</span><spanclass="o">.</span><spanclass="na">length</span><spanclass="o">(),</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<spanclass="k">new</span><spanclass="n">KeyValueMapper</span><spanclass="o"><</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">Integer</span><spanclass="o">>()</span><spanclass="o">{</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<li><pclass="first">The join is indirectly <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">KeyValueMapper#apply(leftRecord.key,</span><spanclass="pre">leftRecord.value)</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em>, which indicate the deletion of a record key from the table. Tombstones do not trigger the
<p>The <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code> is fully bootstrapped upon (re)start of a <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, which means the table is fully populated with all the data in the underlying topic that is
available at the time of the startup. The actual data processing begins only once the bootstrapping has completed.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<spanclass="o">(</span><spanclass="n">leftKey</span><spanclass="o">,</span><spanclass="n">leftValue</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">leftKey</span><spanclass="o">.</span><spanclass="na">length</span><spanclass="o">(),</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<spanclass="k">new</span><spanclass="n">KeyValueMapper</span><spanclass="o"><</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">Integer</span><spanclass="o">>()</span><spanclass="o">{</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<li><pclass="first">The join is indirectly <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">KeyValueMapper#apply(leftRecord.key,</span><spanclass="pre">leftRecord.value)</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em>, which indicate the deletion of a record key from the table. Tombstones do not trigger the
join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>.</p>
</li>
</ul>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of stream-table joins:</strong>
The join semantics are identical to <aclass="reference internal"href="#streams-developer-guide-dsl-joins-kstream-ktable"><spanclass="std std-ref">KStream-KTable joins</span></a>.
The only difference is that, for KStream-GlobalKTable joins, the left input record is first “mapped” with
a user-supplied <codeclass="docutils literal"><spanclass="pre">KeyValueMapper</span></code> into the table’s keyspace prior to the table lookup.</p>
</div>
</div>
<divclass="section"id="windowing">
<spanid="streams-developer-guide-dsl-windowing"></span><h4><aclass="toc-backref"href="#id19">Windowing</a><aclass="headerlink"href="#windowing"title="Permalink to this headline"></a></h4>
<p>Windowing lets you control how to group records that have the same key for stateful operations such as
<aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> or <aclass="reference internal"href="#streams-developer-guide-dsl-joins"><spanclass="std std-ref">joins</span></a> into
so-called windows. Windows are tracked per record key.</p>
<pclass="last">A related operation is <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">grouping</span></a>, which groups all
records that have the same key to ensure that data is properly partitioned (“keyed”) for subsequent operations.
Once grouped, windowing allows you to further sub-group the records of a key.</p>
</div>
<p>For example, in join operations, a windowing state store is used to store all the records received so far within the
defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation
results per window.
Old records in the state store are purged after the specified
Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be
changed via <codeclass="docutils literal"><spanclass="pre">Windows#until()</span></code> and <codeclass="docutils literal"><spanclass="pre">SessionWindows#until()</span></code>.</p>
<p>The DSL supports the following types of windows:</p>
<spanid="windowing-tumbling"></span><h5><aclass="toc-backref"href="#id20">Tumbling time windows</a><aclass="headerlink"href="#tumbling-time-windows"title="Permalink to this headline"></a></h5>
<p>Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time
intervals. They model fixed-size, non-overlapping, gap-less windows.
A tumbling window is defined by a single property: the window’s <em>size</em>.
A tumbling window is a hopping window whose window size is equal to its advance interval.
Since tumbling windows never overlap, a data record will belong to one and only one window.</p>
<pclass="caption"><spanclass="caption-text">This diagram shows windowing a stream of data records with tumbling windows. Windows do not overlap because, by
definition, the advance interval is identical to the window size. In this diagram the time numbers represent minutes;
e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means
the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would
become t=300,000).</span></p>
</div>
<p>Tumbling time windows are <em>aligned to the epoch</em>, with the lower interval bound being inclusive and the upper bound
being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, tumbling
windows with a size of 5000ms have predictable window boundaries <codeclass="docutils literal"><spanclass="pre">[0;5000),[5000;10000),...</span></code>— and <strong>not</strong>
<codeclass="docutils literal"><spanclass="pre">[1000;6000),[6000;11000),...</span></code> or even something “random” like <codeclass="docutils literal"><spanclass="pre">[1452;6452),[6452;11452),...</span></code>.</p>
<p>The following code defines a tumbling window with a size of 5 minutes:</p>
<spanid="windowing-hopping"></span><h5><aclass="toc-backref"href="#id21">Hopping time windows</a><aclass="headerlink"href="#hopping-time-windows"title="Permalink to this headline"></a></h5>
<p>Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows.
A hopping window is defined by two properties: the window’s <em>size</em> and its <em>advance interval</em> (aka “hop”). The advance
interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a
hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap – and in
general they do – a data record may belong to more than one such windows.</p>
<pclass="caption"><spanclass="caption-text">This diagram shows windowing a stream of data records with hopping windows. In this diagram the time numbers
represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is
milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to
milliseconds (e.g. t=5 would become t=300,000).</span></p>
</div>
<p>Hopping time windows are <em>aligned to the epoch</em>, with the lower interval bound being inclusive and the upper bound
being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping
windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries
<codeclass="docutils literal"><spanclass="pre">[0;5000),[3000;8000),...</span></code>— and <strong>not</strong><codeclass="docutils literal"><spanclass="pre">[1000;6000),[4000;9000),...</span></code> or even something “random” like
<p>Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a <em>windowed KTable</em> whose keys
type is <codeclass="docutils literal"><spanclass="pre">Windowed<K></span></code>. This is to differentiate aggregate values with the same key from different windows. The
corresponding window instance and the embedded key can be retrieved as <codeclass="docutils literal"><spanclass="pre">Windowed#window()</span></code> and <codeclass="docutils literal"><spanclass="pre">Windowed#key()</span></code>,
respectively.</p>
</div>
<divclass="section"id="sliding-time-windows">
<spanid="windowing-sliding"></span><h5><aclass="toc-backref"href="#id22">Sliding time windows</a><aclass="headerlink"href="#sliding-time-windows"title="Permalink to this headline"></a></h5>
<p>Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows
are used only for <aclass="reference internal"href="#streams-developer-guide-dsl-joins"><spanclass="std std-ref">join operations</span></a>, and can be specified through the
<p>A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are
said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is
within the window size. Thus, sliding windows are not aligned to the epoch, but to the data record timestamps. In
contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are
<em>both inclusive</em>.</p>
</div>
<divclass="section"id="session-windows">
<spanid="windowing-session"></span><h5><aclass="toc-backref"href="#id23">Session Windows</a><aclass="headerlink"href="#session-windows"title="Permalink to this headline"></a></h5>
<p>Session windows are used to aggregate key-based events into so-called <em>sessions</em>, the process of which is referred to
as <em>sessionization</em>. Sessions represent a <strong>period of activity</strong> separated by a defined <strong>gap of inactivity</strong> (or
“idleness”). Any events processed that fall within the inactivity gap of any existing sessions are merged into the
existing sessions. If an event falls outside of the session gap, then a new session will be created.</p>
<p>Session windows are different from the other window types in that:</p>
<ulclass="simple">
<li>all windows are tracked independently across keys – e.g. windows of different keys typically have different start
and end times</li>
<li>their window sizes sizes vary – even windows for the same key typically have different sizes</li>
</ul>
<p>The prime area of application for session windows is <strong>user behavior analysis</strong>. Session-based analyses can range from
simple metrics (e.g. count of user visits on a news website or social platform) to more complex metrics (e.g. customer
conversion funnel and event flows).</p>
<p>The following code defines a session window with an inactivity gap of 5 minutes:</p>
<pclass="caption"><spanclass="caption-text">Detected sessions after having received three input records: two records for the green record key at t=0 and t=6, and
one record for the blue record key at t=2.
In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit
of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to
convert from minutes to milliseconds (e.g. t=5 would become t=300,000).</span></p>
</div>
<p>If we then receive three additional records (including two late-arriving records), what would happen is that the two
existing sessions for the green record key will be merged into a single session starting at time 0 and ending at time 6,
consisting of a total of three records. The existing session for the blue record key will be extended to end at time 5,
consisting of a total of two records. And, finally, there will be a new session for the blue key starting and ending at
<pclass="caption"><spanclass="caption-text">Detected sessions after having received six input records. Note the two late-arriving data records at t=4 (green) and
t=5 (blue), which lead to a merge of sessions and an extension of a session, respectively.</span></p>
<spanid="streams-developer-guide-dsl-process"></span><h3><aclass="toc-backref"href="#id24">Applying processors and transformers (Processor API integration)</a><aclass="headerlink"href="#applying-processors-and-transformers-processor-api-integration"title="Permalink to this headline"></a></h3>
<p>Beyond the aforementioned <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">stateless</span></a> and
<aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">stateful</span></a> transformations, you may also
leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
There are a number of scenarios where this may be helpful:</p>
<ulclass="simple">
<li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet available in the DSL.</li>
<li><strong>Combining ease-of-use with full flexibility where it’s needed:</strong> Even though you generally prefer to use
the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and
tinkering than the DSL provides. For example, only the Processor API provides access to a
<td><pclass="first"><strong>Terminal operation.</strong> Applies a <codeclass="docutils literal"><spanclass="pre">Processor</span></code> to each record.
<codeclass="docutils literal"><spanclass="pre">process()</span></code> allows you to leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
<p>This is essentially equivalent to adding the <codeclass="docutils literal"><spanclass="pre">Processor</span></code> via <codeclass="docutils literal"><spanclass="pre">Topology#addProcessor()</span></code> to your
<td><pclass="first">Applies a <codeclass="docutils literal"><spanclass="pre">Transformer</span></code> to each record.
<codeclass="docutils literal"><spanclass="pre">transform()</span></code> allows you to leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
<p>Each input record is transformed into zero, one, or more output records (similar to the stateless <codeclass="docutils literal"><spanclass="pre">flatMap</span></code>).
The <codeclass="docutils literal"><spanclass="pre">Transformer</span></code> must return <codeclass="docutils literal"><spanclass="pre">null</span></code> for zero output.
You can modify the record’s key and value, including their types.</p>
<p><strong>Marks the stream for data re-partitioning:</strong>
Applying a grouping or a join after <codeclass="docutils literal"><spanclass="pre">transform</span></code> will result in re-partitioning of the records.
If possible use <codeclass="docutils literal"><spanclass="pre">transformValues</span></code> instead, which will not cause data re-partitioning.</p>
<p><codeclass="docutils literal"><spanclass="pre">transform</span></code> is essentially equivalent to adding the <codeclass="docutils literal"><spanclass="pre">Transformer</span></code> via <codeclass="docutils literal"><spanclass="pre">Topology#addProcessor()</span></code> to your
<td><pclass="first">Applies a <codeclass="docutils literal"><spanclass="pre">ValueTransformer</span></code> to each record, while retaining the key of the original record.
<codeclass="docutils literal"><spanclass="pre">transformValues()</span></code> allows you to leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
<p>Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible).
The <codeclass="docutils literal"><spanclass="pre">ValueTransformer</span></code> may return <codeclass="docutils literal"><spanclass="pre">null</span></code> as the new value for a record.</p>
<p><codeclass="docutils literal"><spanclass="pre">transformValues</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">transform</span></code> because it will not cause data re-partitioning.</p>
<p><codeclass="docutils literal"><spanclass="pre">transformValues</span></code> is essentially equivalent to adding the <codeclass="docutils literal"><spanclass="pre">ValueTransformer</span></code> via <codeclass="docutils literal"><spanclass="pre">Topology#addProcessor()</span></code> to your
<p>The following example shows how to leverage, via the <codeclass="docutils literal"><spanclass="pre">KStream#process()</span></code> method, a custom <codeclass="docutils literal"><spanclass="pre">Processor</span></code> that sends an
email notification whenever a page view count reaches a predefined threshold.</p>
<p>First, we need to implement a custom stream processor, <codeclass="docutils literal"><spanclass="pre">PopularPageEmailAlert</span></code>, that implements the <codeclass="docutils literal"><spanclass="pre">Processor</span></code>
interface:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// A processor that sends an alert message about a popular page to a configurable email address</span>
<pclass="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by
calling <codeclass="docutils literal"><spanclass="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the
corresponding <codeclass="docutils literal"><spanclass="pre">KStream#process()</span></code> method call (note that this is a different method than <codeclass="docutils literal"><spanclass="pre">Processor#process()</span></code>),
plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only
allow for read-only access.</p>
</div>
<p>Then we can leverage the <codeclass="docutils literal"><spanclass="pre">PopularPageEmailAlert</span></code> processor in the DSL via <codeclass="docutils literal"><spanclass="pre">KStream#process</span></code>.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Send an email notification when the view count of a page reaches one thousand.</span>
<spanid="streams-developer-guide-dsl-destinations"></span><h2><aclass="toc-backref"href="#id25">Writing streams back to Kafka</a><aclass="headerlink"href="#writing-streams-back-to-kafka"title="Permalink to this headline"></a></h2>
<p>Any streams and tables may be (continuously) written back to a Kafka topic. As we will describe in more detail below, the output data might be
re-partitioned on its way to Kafka, depending on the situation.</p>
<li>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</li>
<li>You <strong>must specify SerDes explicitly</strong> via the <codeclass="docutils literal"><spanclass="pre">Produced</span></code> class if the key and/or value types of the
<codeclass="docutils literal"><spanclass="pre">KStream</span></code> do not match the configured default SerDes.</li>
<li>See <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a> for information about configuring default SerDes, available SerDes,
and implementing your own custom SerDes.</li>
</ul>
<p>A variant of <codeclass="docutils literal"><spanclass="pre">to</span></code> exists that enables you to specify how the data is produced by using a <codeclass="docutils literal"><spanclass="pre">Produced</span></code>
instance to specify, for example, a <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> that gives you control over
how output records are distributed across the partitions of the output topic.</p>
<p>Another variant of <codeclass="docutils literal"><spanclass="pre">to</span></code> exists that enables you to dynamically choose which topic to send to for each record via a <codeclass="docutils literal"><spanclass="pre">TopicNameExtractor</span></code>
<p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
<olclass="last arabic simple">
<li>If the output topic has a different number of partitions than the stream/table.</li>
<li>If the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> was marked for re-partitioning.</li>
<li>If you provide a custom <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> to explicitly control how to distribute the output records
across the partitions of the output topic.</li>
<li>If the key of an output record is <codeclass="docutils literal"><spanclass="pre">null</span></code>.</li>
<td><pclass="first">Write the records to a Kafka topic and create a new stream/table from that topic.
Essentially a shorthand for <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> followed by <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code>, same for tables.
<li>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</li>
<li>You <strong>must specify SerDes explicitly</strong> if the key and/or value types of the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> or <codeclass="docutils literal"><spanclass="pre">KTable</span></code> do not
match the configured default SerDes.</li>
<li>See <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a> for information about configuring default SerDes, available SerDes,
and implementing your own custom SerDes.</li>
</ul>
<p>A variant of <codeclass="docutils literal"><spanclass="pre">through</span></code> exists that enables you to specify how the data is produced by using a <codeclass="docutils literal"><spanclass="pre">Produced</span></code>
instance to specify, for example, a <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> that gives you control over
how output records are distributed across the partitions of the output topic.</p>
<p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
<olclass="last arabic simple">
<li>If the output topic has a different number of partitions than the stream/table.</li>
<li>If the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> was marked for re-partitioning.</li>
<li>If you provide a custom <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> to explicitly control how to distribute the output records
across the partitions of the output topic.</li>
<li>If the key of an output record is <codeclass="docutils literal"><spanclass="pre">null</span></code>.</li>
<pclass="last"><strong>When you want to write to systems other than Kafka:</strong>
Besides writing the data back to Kafka, you can also apply a
<aclass="reference internal"href="#streams-developer-guide-dsl-process"><spanclass="std std-ref">custom processor</span></a> as a stream sink at the end of the processing to, for
example, write to external databases. First, doing so is not a recommended pattern – we strongly suggest to use the
<aclass="reference internal"href="../../connect/index.html#kafka-connect"><spanclass="std std-ref">Kafka Connect API</span></a> instead. However, if you do use such a sink processor, please be aware that
it is now your responsibility to guarantee message delivery semantics when talking to such external systems (e.g., to
retry on delivery failure or to prevent message duplication).</p>
<spanid="streams-developer-guide-dsl-scala-dsl"></span><h2><aclass="toc-backref"href="#id27">Kafka Streams DSL for Scala</a><aclass="headerlink"href="#scala-dsl"title="Permalink to this headline"></a></h2>
<pid="scala-dsl-motivation">The Kafka Streams DSL Java APIs are based on the Builder design pattern, which allows users to incrementally build the target functionality using lower level compositional fluent APIs. These APIs can be called from Scala, but there are several issues:</p>
<olclass="last arabic simple">
<li><strong>Additional type annotations</strong> - The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.</li>
<li><strong>Verbosity</strong> - In some cases the Java APIs appear too verbose compared to idiomatic Scala.</li>
<li><strong>Type Unsafety</strong> - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the SerDes defined as part of config are not type checked during compile time. Hence any missing SerDes can result in runtime errors.</li>
</ol>
<pid="scala-dsl-overview">The Kafka Streams DSL for Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL that addresses the concerns raised above.
It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and lesser boilerplates.
<li>The usual builder-style composition that developers get with the original Java API.</li>
<li>Implicit serializers and de-serializers leading to better abstraction and less verbosity.</li>
<li>Better type safety during compile time.</li>
</ol>
<p>All functionality provided by Kafka Streams DSL for Scala are under the root package name of <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala</span></code>.</p>
<p>Many of the public facing types from the Java API are wrapped. The following Scala abstractions are available to the user:</p>
<p>The library also has several utility abstractions and modules that the user needs to use for proper semantics.</p>
<ul>
<li><codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.ImplicitConversions</span></code>: Module that brings into scope the implicit conversions between the Scala and Java classes.</li>
<li><codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.Serdes</span></code>: Module that contains all primitive SerDes that can be imported as implicits and a helper to create custom SerDes.</li>
<p>The library is cross-built with Scala 2.11 and 2.12. To reference the library compiled against Scala 2.11 include the following in your maven <code>pom.xml</code> add the following:</p>
<p>To use the library compiled against Scala 2.12 replace the <codeclass="docutils literal"><spanclass="pre">artifactId</span></code> with <codeclass="docutils literal"><spanclass="pre">kafka-streams-scala_2.12</span></code>.</p>
<p>When using SBT then you can reference the correct library using the following:</p>
<spanid="streams-developer-guide-dsl-sample-usage"></span><h3><aclass="toc-backref"href="#id28">Sample Usage</a><aclass="headerlink"href="#scala-dsl-sample-usage"title="Permalink to this headline"></a></h3>
<p>The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction, but they reside in a different package of the library e.g. the Scala class <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.StreamsBuilder</span></code> is a wrapper around <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.StreamsBuilder</span></code>, <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.scala.kstream.KStream</span></code> is a wrapper around <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.kstream.KStream</span></code>, and so on.</p>
<p>Here's an example of the classic WordCount program that uses the Scala <codeclass="docutils literal"><spanclass="pre">StreamsBuilder</span></code> that builds an instance of <codeclass="docutils literal"><spanclass="pre">KStream</span></code> which is a wrapper around Java <codeclass="docutils literal"><spanclass="pre">KStream</span></code>. Then we reify to a table and get a <codeclass="docutils literal"><spanclass="pre">KTable</span></code>, which, again is a wrapper around Java <codeclass="docutils literal"><spanclass="pre">KTable</span></code>.</p>
<p>The net result is that the following code is structured just like using the Java API, but with Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage is more obvious when given an example. Below is an example WordCount implementation that will be used to demonstrate the differences between the Scala and Java API.</p>
<p>In the above code snippet, we don't have to provide any SerDes, <codeclass="docutils literal"><spanclass="pre">Serialized</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> explicitly. They will also not be dependent on any SerDes specified in the config. <strong>In fact all SerDes specified in the config will be ignored by the Scala APIs</strong>. All SerDes and <codeclass="docutils literal"><spanclass="pre">Serialized</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> will be handled through implicit SerDes as discussed later in the <ahref="#scala-dsl-implicit-serdes">Implicit SerDes</a> section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, <codeclass="docutils literal"><spanclass="pre">Serialized</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> will be flagged as a compile time error.</p>
<spanid="streams-developer-guide-dsl-scala-dsl-implicit-serdes"></span><h3><aclass="toc-backref"href="#id29">Implicit SerDes</a><aclass="headerlink"href="#scala-dsl-implicit-serdes"title="Permalink to this headline"></a></h3>
<p>One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like <codeclass="docutils literal"><spanclass="pre">Serialized</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code>. And the user has to supply them every time through the with function of these classes.</p>
<p>The library uses the power of <ahref="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <codeclass="docutils literal"><spanclass="pre">Serialized</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <codeclass="docutils literal"><spanclass="pre">Serialized</span></code>, <codeclass="docutils literal"><spanclass="pre">Produced</span></code>, <codeclass="docutils literal"><spanclass="pre">Consumed</span></code> or <codeclass="docutils literal"><spanclass="pre">Joined</span></code> available in scope.</p>
<p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p>
<p>Here's an example:</p>
<preclass="brush: scala;">
// DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
// that will set up all Serialized, Produced, Consumed and Joined instances.
// So all APIs below that accept Serialized, Produced, Consumed or Joined will
<li>All SerDes are picked up from the implicits in scope. And <codeclass="docutils literal"><spanclass="pre">import Serdes._</span></code> brings all necessary SerDes in scope.</li>
<spanid="streams-developer-guide-dsl-scala-dsl-user-defined-serdes"></span><h3><aclass="toc-backref"href="#id30">User-Defined SerDes</a><aclass="headerlink"href="#scala-dsl-user-defined-serdes"title="Permalink to this headline"></a></h3>
<p>When the default primitive SerDes are not enough and we need to define custom SerDes, the usage is exactly the same as above. Just define the implicit SerDes and start building the stream transformation. Here's an example with <codeclass="docutils literal"><spanclass="pre">AvroSerde</span></code>:</p>
<preclass="brush: scala;">
// domain object as a case class
case class UserClicks(clicks: Long)
// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde