<li><aclass="reference internal"href="#applying-processors-and-transformers-processor-api-integration"id="id24">Applying processors and transformers (Processor API integration)</a></li>
</ul>
</li>
<li><aclass="reference internal"href="#writing-streams-back-to-kafka"id="id25">Writing streams back to Kafka</a></li>
</ul>
</div>
<divclass="section"id="overview">
<h2><aclass="toc-backref"href="#id7">Overview</a><aclass="headerlink"href="#overview"title="Permalink to this headline"></a></h2>
<p>In comparison to the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a>, only the DSL supports:</p>
<ulclass="simple">
<li>Built-in abstractions for <aclass="reference internal"href="../concepts.html#streams-concepts-duality"><spanclass="std std-ref">streams and tables</span></a> in the form of
<aclass="reference internal"href="../concepts.html#streams-concepts-kstream"><spanclass="std std-ref">KStream</span></a>, <aclass="reference internal"href="../concepts.html#streams-concepts-ktable"><spanclass="std std-ref">KTable</span></a>, and
<aclass="reference internal"href="../concepts.html#streams-concepts-globalktable"><spanclass="std std-ref">GlobalKTable</span></a>. Having first-class support for streams and tables is crucial
because, in practice, most use cases require not just either streams or databases/tables, but a combination of both.
For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your
application will be doing is transforming many input <em>streams</em> of customer-related events into an output <em>table</em>
that contains a continuously updated 360-degree view of your customers.</li>
<li>Declarative, functional programming style with
<aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">stateless transformations</span></a> (e.g. <codeclass="docutils literal"><spanclass="pre">map</span></code> and <codeclass="docutils literal"><spanclass="pre">filter</span></code>)
as well as <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateful"><spanclass="std std-ref">stateful transformations</span></a>
such as <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> (e.g. <codeclass="docutils literal"><spanclass="pre">count</span></code> and <codeclass="docutils literal"><spanclass="pre">reduce</span></code>),
<aclass="reference internal"href="#streams-developer-guide-dsl-joins"><spanclass="std std-ref">joins</span></a> (e.g. <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>), and
<p>With the DSL, you can define <aclass="reference internal"href="../concepts.html#streams-concepts-processor-topology"><spanclass="std std-ref">processor topologies</span></a> (i.e., the logical
processing plan) in your application. The steps to accomplish this are:</p>
<olclass="arabic simple">
<li>Specify <aclass="reference internal"href="#streams-developer-guide-dsl-sources"><spanclass="std std-ref">one or more input streams that are read from Kafka topics</span></a>.</li>
<li>Compose <aclass="reference internal"href="#streams-developer-guide-dsl-transformations"><spanclass="std std-ref">transformations</span></a> on these streams.</li>
<li>Write the <aclass="reference internal"href="#streams-developer-guide-dsl-destinations"><spanclass="std std-ref">resulting output streams back to Kafka topics</span></a>, or expose the processing results of your application directly to other applications through <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> (e.g., via a REST API).</li>
</ol>
<p>After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into
action). A step-by-step guide for writing a stream processing application using the DSL is provided below.</p>
<p>For a complete list of available API functionality, see also the <aclass="reference internal"href="../javadocs.html#streams-javadocs"><spanclass="std std-ref">Kafka Streams Javadocs</span></a>.</p>
<spanid="streams-developer-guide-dsl-sources"></span><h2><aclass="toc-backref"href="#id8">Creating source streams from Kafka</a><aclass="headerlink"href="#creating-source-streams-from-kafka"title="Permalink to this headline"></a></h2>
<p>You can easily read data from Kafka topics into your application. The following operations are supported.</p>
<td><pclass="first">Creates a <aclass="reference internal"href="../concepts.html#streams-concepts-kstream"><spanclass="std std-ref">KStream</span></a> from the specified Kafka input topics and interprets the data
as a <aclass="reference internal"href="../concepts.html#streams-concepts-kstream"><spanclass="std std-ref">record stream</span></a>.
A <codeclass="docutils literal"><spanclass="pre">KStream</span></code> represents a <em>partitioned</em> record stream.
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()</span><spanclass="cm">/* value serde */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</p>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
SerDes, and implementing your own custom SerDes see <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a>.</p>
<pclass="last">Several variants of <codeclass="docutils literal"><spanclass="pre">stream</span></code> exist, for example to specify a regex pattern for input topics to read from).</p>
<td><pclass="first">Reads the specified Kafka input topic into a <aclass="reference internal"href="../concepts.html#streams-concepts-ktable"><spanclass="std std-ref">KTable</span></a>. The topic is
interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE
(when the record value is not <codeclass="docutils literal"><spanclass="pre">null</span></code>) or as DELETE (when the value is <codeclass="docutils literal"><spanclass="pre">null</span></code>) for that key.
<p>In the case of a KStream, the local KStream instance of every application instance will
be populated with data from only <strong>a subset</strong> of the partitions of the input topic. Collectively, across
all application instances, all input topic partitions are read and processed.</p>
<p>You must provide a name for the table (more precisely, for the internal
<aclass="reference internal"href="../architecture.html#streams-architecture-state"><spanclass="std std-ref">state store</span></a> that backs the table). This is required for
supporting <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> against the table. When a
name is not provided the table will not queryable and an internal name will be provided for the state store.</p>
<p>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</p>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
SerDes, and implementing your own custom SerDes see <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a>.</p>
<pclass="last">Several variants of <codeclass="docutils literal"><spanclass="pre">table</span></code> exist, for example to specify the <codeclass="docutils literal"><spanclass="pre">auto.offset.reset</span></code> policy to be used when
<td><pclass="first">Reads the specified Kafka input topic into a <aclass="reference internal"href="../concepts.html#streams-concepts-globalktable"><spanclass="std std-ref">GlobalKTable</span></a>. The topic is
interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE
(when the record value is not <codeclass="docutils literal"><spanclass="pre">null</span></code>) or as DELETE (when the value is <codeclass="docutils literal"><spanclass="pre">null</span></code>) for that key.
<p>In the case of a GlobalKTable, the local GlobalKTable instance of every application instance will
be populated with data from only <strong>a subset</strong> of the partitions of the input topic. Collectively, across
all application instances, all input topic partitions are read and processed.</p>
<p>You must provide a name for the table (more precisely, for the internal
<aclass="reference internal"href="../architecture.html#streams-architecture-state"><spanclass="std std-ref">state store</span></a> that backs the table). This is required for
supporting <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a> against the table. When a
name is not provided the table will not queryable and an internal name will be provided for the state store.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* value serde */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
topics do not match the configured default SerDes. For information about configuring default SerDes, available
SerDes, and implementing your own custom SerDes see <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a>.</p>
<pclass="last">Several variants of <codeclass="docutils literal"><spanclass="pre">globalTable</span></code> exist to e.g. specify explicit SerDes.</p>
</td>
</tr>
</tbody>
</table>
</div>
<divclass="section"id="transform-a-stream">
<spanid="streams-developer-guide-dsl-transformations"></span><h2><aclass="toc-backref"href="#id9">Transform a stream</a><aclass="headerlink"href="#transform-a-stream"title="Permalink to this headline"></a></h2>
<p>The KStream and KTable interfaces support a variety of transformation operations.
Each of these operations can be translated into one or more connected processors into the underlying processor topology.
Since KStream and KTable are strongly typed, all of these transformation operations are defined as
generic functions where users could specify the input and output data types.</p>
<p>Some KStream transformations may generate one or more KStream objects, for example:
- <codeclass="docutils literal"><spanclass="pre">filter</span></code> and <codeclass="docutils literal"><spanclass="pre">map</span></code> on a KStream will generate another KStream
- <codeclass="docutils literal"><spanclass="pre">branch</span></code> on KStream can generate multiple KStreams</p>
<p>Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of <aclass="reference internal"href="../concepts.html#streams-concepts-aggregations"><spanclass="std std-ref">late records</span></a> after it
has already been produced to the downstream transformation operators.</p>
<p>All KTable transformation operations can only generate another KTable. However, the Kafka Streams DSL does provide a special function
that converts a KTable representation into a KStream. All of these transformation methods can be chained together to compose
a complex processor topology.</p>
<p>These transformation operations are described in the following subsections:</p>
<spanid="streams-developer-guide-dsl-transformations-stateless"></span><h3><aclass="toc-backref"href="#id10">Stateless transformations</a><aclass="headerlink"href="#stateless-transformations"title="Permalink to this headline"></a></h3>
<p>Stateless transformations do not require state for processing and they do not require a state store associated with
the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless <codeclass="docutils literal"><spanclass="pre">KTable</span></code> transformation. This allows the result to be queried through <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactive queries</span></a>. To materialize a <codeclass="docutils literal"><spanclass="pre">KTable</span></code>, each of the below stateless operations <aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries-local-key-value-stores"><spanclass="std std-ref">can be augmented</span></a> with an optional <codeclass="docutils literal"><spanclass="pre">queryableStoreName</span></code> argument.</p>
<td><pclass="first">Branch (or split) a <codeclass="docutils literal"><spanclass="pre">KStream</span></code> based on the supplied predicates into one or more <codeclass="docutils literal"><spanclass="pre">KStream</span></code> instances.
<spanclass="o">(</span><spanclass="n">key</span><spanclass="o">,</span><spanclass="n">value</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">key</span><spanclass="o">.</span><spanclass="na">startsWith</span><spanclass="o">(</span><spanclass="s">"A"</span><spanclass="o">),</span><spanclass="cm">/* first predicate */</span>
<spanclass="o">(</span><spanclass="n">key</span><spanclass="o">,</span><spanclass="n">value</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">key</span><spanclass="o">.</span><spanclass="na">startsWith</span><spanclass="o">(</span><spanclass="s">"B"</span><spanclass="o">),</span><spanclass="cm">/* second predicate */</span>
<spanclass="o">(</span><spanclass="n">key</span><spanclass="o">,</span><spanclass="n">value</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="kc">true</span><spanclass="cm">/* third predicate */</span>
<spanclass="o">);</span>
<spanclass="c1">// KStream branches[0] contains all records whose keys start with "A"</span>
<spanclass="c1">// KStream branches[1] contains all records whose keys start with "B"</span>
<spanclass="c1">// KStream branches[2] contains all other records</span>
<spanclass="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span>
<p><strong>Marks the stream for data re-partitioning:</strong>
Applying a grouping or a join after <codeclass="docutils literal"><spanclass="pre">flatMap</span></code> will result in re-partitioning of the records.
If possible use <codeclass="docutils literal"><spanclass="pre">flatMapValues</span></code> instead, which will not cause data re-partitioning.</p>
<p><codeclass="docutils literal"><spanclass="pre">flatMapValues</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">flatMap</span></code> because it will not cause data re-partitioning. However, you
cannot modify the key or key type like <codeclass="docutils literal"><spanclass="pre">flatMap</span></code> does.</p>
<divclass="last highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Split a sentence into words.</span>
<p>You would use <codeclass="docutils literal"><spanclass="pre">foreach</span></code> to cause <em>side effects</em> based on the input data (similar to <codeclass="docutils literal"><spanclass="pre">peek</span></code>) and then <em>stop</em>
<em>further processing</em> of the input data (unlike <codeclass="docutils literal"><spanclass="pre">peek</span></code>, which is not a terminal operation).</p>
<p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p>
<p>Grouping is a prerequisite for <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregating a stream or a table</span></a>
and ensures that data is properly partitioned (“keyed”) for subsequent operations.</p>
<p><strong>When to set explicit SerDes:</strong>
Variants of <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> exist to override the configured default SerDes of your application, which <strong>you</strong>
<strong>must do</strong> if the key and/or value types of the resulting <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code> do not match the configured default
SerDes.</p>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>Grouping vs. Windowing:</strong>
A related operation is <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowing</span></a>, which lets you control how to
“sub-group” the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as
windowed <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> or
<p><strong>Causes data re-partitioning if and only if the stream was marked for re-partitioning.</strong>
<codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">groupBy</span></code> because it re-partitions data only if the stream was already marked
for re-partitioning. However, <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> does not allow you to modify the key or key type like <codeclass="docutils literal"><spanclass="pre">groupBy</span></code>
<td><pclass="first">Groups the records by a <em>new</em> key, which may be of a different key type.
When grouping a table, you may also specify a new value and value type.
<codeclass="docutils literal"><spanclass="pre">groupBy</span></code> is a shorthand for <codeclass="docutils literal"><spanclass="pre">selectKey(...).groupByKey()</span></code>.
<p>Grouping is a prerequisite for <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregating a stream or a table</span></a>
and ensures that data is properly partitioned (“keyed”) for subsequent operations.</p>
<p><strong>When to set explicit SerDes:</strong>
Variants of <codeclass="docutils literal"><spanclass="pre">groupBy</span></code> exist to override the configured default SerDes of your application, which <strong>you must</strong>
<strong>do</strong> if the key and/or value types of the resulting <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code> or <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code> do not match the
configured default SerDes.</p>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>Grouping vs. Windowing:</strong>
A related operation is <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowing</span></a>, which lets you control how to
“sub-group” the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as
windowed <aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> or
<p><strong>Always causes data re-partitioning:</strong><codeclass="docutils literal"><spanclass="pre">groupBy</span></code> always causes data re-partitioning.
If possible use <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> instead, which will re-partition data only if required.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">())</span><spanclass="cm">/* value */</span>
<spanclass="o">);</span>
<spanclass="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">())</span><spanclass="cm">/* value (note: type was modified) */</span>
<spanclass="o">);</span>
<spanclass="c1">// Java 7 examples</span>
<spanclass="c1">// Group the stream by a new key and key type</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">())</span><spanclass="cm">/* value */</span>
<spanclass="o">);</span>
<spanclass="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">String</span><spanclass="o">(),</span><spanclass="cm">/* key (note: type was modified) */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">())</span><spanclass="cm">/* value (note: type was modified) */</span>
<p><codeclass="docutils literal"><spanclass="pre">mapValues</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">map</span></code> because it will not cause data re-partitioning. However, it does not
allow you to modify the key or key type like <codeclass="docutils literal"><spanclass="pre">map</span></code> does.</p>
<p>You would use <codeclass="docutils literal"><spanclass="pre">peek</span></code> to cause <em>side effects</em> based on the input data (similar to <codeclass="docutils literal"><spanclass="pre">foreach</span></code>) and <em>continue</em>
<em>processing</em> the input data (unlike <codeclass="docutils literal"><spanclass="pre">foreach</span></code>, which is a terminal operation). <codeclass="docutils literal"><spanclass="pre">peek</span></code> returns the input
stream as-is; if you need to modify the input stream, use <codeclass="docutils literal"><spanclass="pre">map</span></code> or <codeclass="docutils literal"><spanclass="pre">mapValues</span></code> instead.</p>
<p><codeclass="docutils literal"><spanclass="pre">peek</span></code> is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.</p>
<p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p>
<td><pclass="first"><strong>Terminal operation.</strong> Prints the records to <codeclass="docutils literal"><spanclass="pre">System.out</span></code>. See Javadocs for serde and <codeclass="docutils literal"><spanclass="pre">toString()</span></code>
<p>Calling <codeclass="docutils literal"><spanclass="pre">print()</span></code> is the same as calling <codeclass="docutils literal"><spanclass="pre">foreach((key,</span><spanclass="pre">value)</span><spanclass="pre">-></span><spanclass="pre">System.out.println(key</span><spanclass="pre">+</span><spanclass="pre">",</span><spanclass="pre">"</span><spanclass="pre">+</span><spanclass="pre">value))</span></code></p>
<p>Calling <codeclass="docutils literal"><spanclass="pre">selectKey(mapper)</span></code> is the same as calling <codeclass="docutils literal"><spanclass="pre">map((key,</span><spanclass="pre">value)</span><spanclass="pre">-></span><spanclass="pre">mapper(key,</span><spanclass="pre">value),</span><spanclass="pre">value)</span></code>.</p>
<p><strong>Marks the stream for data re-partitioning:</strong>
Applying a grouping or a join after <codeclass="docutils literal"><spanclass="pre">selectKey</span></code> will result in re-partitioning of the records.</p>
<spanid="streams-developer-guide-dsl-transformations-stateful"></span><h3><aclass="toc-backref"href="#id11">Stateful transformations</a><aclass="headerlink"href="#stateful-transformations"title="Permalink to this headline"></a></h3>
<pid="streams-developer-guide-dsl-transformations-stateful-overview">Stateful transformations depend on state for processing inputs and producing outputs and require a <aclass="reference internal"href="../architecture.html#streams-architecture-state"><spanclass="std std-ref">state store</span></a> associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per
window. In join operations, a windowing state store is used to collect all of the records received so far within the
defined window boundary.</p>
<p>Note, that state stores are fault-tolerant.
In case of failure, Kafka Streams guarantees to fully restore all state stores prior to resuming the processing.
See <aclass="reference internal"href="../architecture.html#streams-architecture-fault-tolerance"><spanclass="std std-ref">Fault Tolerance</span></a> for further information.</p>
<p>Available stateful transformations in the DSL include:</p>
<li><aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">Windowing</span></a> (as part of aggregations and joins)</li>
<li><aclass="reference internal"href="#streams-developer-guide-dsl-process"><spanclass="std std-ref">Applying custom processors and transformers</span></a>, which may be stateful, for
Processor API integration</li>
</ul>
<p>The following diagram shows their relationships:</p>
<pclass="caption"><spanclass="caption-text">Stateful transformations in the DSL.</span></p>
</div>
<p>Here is an example of a stateful application: the WordCount algorithm.</p>
<p>WordCount example in Java 8+, using lambda expressions:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Assume the record values represent lines of text. For the sake of this example, you can ignore</span>
<spanclass="c1">// whatever may be stored in the record keys.</span>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Code below is equivalent to the previous Java 8+ example above.</span>
<spanid="streams-developer-guide-dsl-aggregating"></span><h4><aclass="toc-backref"href="#id12">Aggregating</a><aclass="headerlink"href="#aggregating"title="Permalink to this headline"></a></h4>
<p>After records are <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">grouped</span></a> by key via <codeclass="docutils literal"><spanclass="pre">groupByKey</span></code> or
<codeclass="docutils literal"><spanclass="pre">groupBy</span></code>– and thus represented as either a <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code> or a <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>, they can be aggregated
via an operation such as <codeclass="docutils literal"><spanclass="pre">reduce</span></code>. Aggregations are key-based operations, which means that they always operate over records
(notably record values) of the same key.
You can perform aggregations on <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowed</span></a> or non-windowed data.</p>
<td><pclass="first"><strong>Rolling aggregation.</strong> Aggregates the values of (non-windowed) records by the grouped key.
Aggregating is a generalization of <codeclass="docutils literal"><spanclass="pre">reduce</span></code> and allows, for example, the aggregate value to have a different
<p>When aggregating a <em>grouped stream</em>, you must provide an initializer (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">=</span><spanclass="pre">0</span></code>) and an “adder”
aggregator (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">+</span><spanclass="pre">curValue</span></code>). When aggregating a <em>grouped table</em>, you must provide a
<spanclass="n">Materialized</span><spanclass="o">.</span><spanclass="na">as</span><spanclass="o">(</span><spanclass="s">"aggregated-stream-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Aggregating a KGroupedTable (note how the value type changes from String to Long)</span>
<spanclass="n">Materialized</span><spanclass="o">.</span><spanclass="na">as</span><spanclass="o">(</span><spanclass="s">"aggregated-table-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Java 7 examples</span>
<spanclass="c1">// Aggregating a KGroupedStream (note how the value type changes from String to Long)</span>
<p>Detailed behavior of <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored.</li>
<li>When a record key is received for the first time, the initializer is called (and called before the adder).</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received, the adder is called.</li>
</ul>
<p>Detailed behavior of <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored.</li>
<li>When a record key is received for the first time, the initializer is called (and called before the adder
and subtractor). Note that, in contrast to <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>, over time the initializer may be called
more than once for a key as a result of having received input tombstone records for that key (see below).</li>
<li>When the first non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<codeclass="docutils literal"><spanclass="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The order of execution for the subtractor and adder is not defined.</li>
<li>When a tombstone record – i.e. a record with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value – is received for a key (e.g., DELETE),
then only the subtractor is called. Note that, whenever the subtractor returns a <codeclass="docutils literal"><spanclass="pre">null</span></code> value itself,
then the corresponding key is removed from the resulting <codeclass="docutils literal"><spanclass="pre">KTable</span></code>. If that happens, any next input
record for that key will trigger the initializer again.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
Aggregates the values of records, <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">per window</span></a>, by the grouped key.
Aggregating is a generalization of <codeclass="docutils literal"><spanclass="pre">reduce</span></code> and allows, for example, the aggregate value to have a different
<p>You must provide an initializer (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">=</span><spanclass="pre">0</span></code>), “adder” aggregator (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">+</span><spanclass="pre">curValue</span></code>),
and a window. When windowing based on sessions, you must additionally provide a “session merger” aggregator
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">WindowStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]>></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"time-windowed-aggregated-stream-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()));</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">SessionStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]>></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"sessionized-aggregated-stream-store"</span><spanclass="o">)</span><spanclass="cm">/* state store name */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">()));</span><spanclass="cm">/* serde for aggregate value */</span>
<spanclass="c1">// Java 7 examples</span>
<spanclass="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<li>The windowed aggregate behaves similar to the rolling aggregate described above. The additional twist is that
the behavior applies <em>per window</em>.</li>
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, the initializer is called (and called
before the adder).</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a given window, the adder is called.</li>
<li>When using session windows: the session merger is called whenever two sessions are being merged.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys or values are ignored.</li>
</ul>
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>:</p>
<ulclass="last simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored. Records with <codeclass="docutils literal"><spanclass="pre">null</span></code> values are not ignored but interpreted
as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.</li>
Counts the number of records, <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">per window</span></a>, by the grouped key.
<p>When reducing a <em>grouped stream</em>, you must provide an “adder” reducer (e.g., <codeclass="docutils literal"><spanclass="pre">aggValue</span><spanclass="pre">+</span><spanclass="pre">curValue</span></code>).
When reducing a <em>grouped table</em>, you must additionally provide a “subtractor” reducer (e.g.,
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that record is used as the initial
aggregate value.</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received, the adder is called.</li>
</ul>
<p>Detailed behavior for <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>:</p>
<ulclass="simple">
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that record is used as the initial
aggregate value.
Note that, in contrast to <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>, over time this initialization step may happen more than once
for a key as a result of having received input tombstone records for that key (see below).</li>
<li>When the first non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<codeclass="docutils literal"><spanclass="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The order of execution for the subtractor and adder is not defined.</li>
<li>When a tombstone record – i.e. a record with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value – is received for a key (e.g., DELETE),
then only the subtractor is called. Note that, whenever the subtractor returns a <codeclass="docutils literal"><spanclass="pre">null</span></code> value itself,
then the corresponding key is removed from the resulting <codeclass="docutils literal"><spanclass="pre">KTable</span></code>. If that happens, any next input
record for that key will re-initialize its aggregate value.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
Combines the values of records, <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">per window</span></a>, by the grouped key.
The current record value is combined with the last reduced value, and a new reduced value is returned.
Records with <codeclass="docutils literal"><spanclass="pre">null</span></code> key or value are ignored.
The result value type cannot be changed, unlike <codeclass="docutils literal"><spanclass="pre">aggregate</span></code>.
<p>The windowed <codeclass="docutils literal"><spanclass="pre">reduce</span></code> turns a turns a <codeclass="docutils literal"><spanclass="pre">TimeWindowedKStream<K,</span><spanclass="pre">V></span></code> or a <codeclass="docutils literal"><spanclass="pre">SessionWindowedKStream<K,</span><spanclass="pre">V></span></code>
into a windowed <codeclass="docutils literal"><spanclass="pre">KTable<Windowed<K>,</span><spanclass="pre">V></span></code>.</p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">reduce</span></code> exist, see Javadocs for details.</p>
<li>The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that the
behavior applies <em>per window</em>.</li>
<li>Input records with <codeclass="docutils literal"><spanclass="pre">null</span></code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, then the value of that record is used as
the initial aggregate value.</li>
<li>Whenever a record with a non-<codeclass="docutils literal"><spanclass="pre">null</span></code> value is received for a given window, the adder is called.</li>
</ul>
<pclass="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Example of semantics for stream aggregations:</strong>
A <codeclass="docutils literal"><spanclass="pre">KGroupedStream</span></code>→<codeclass="docutils literal"><spanclass="pre">KTable</span></code> example is shown below. The streams and the table are initially empty. Bold
font is used in the column for “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” to highlight changed state. An entry such as <codeclass="docutils literal"><spanclass="pre">(hello,</span><spanclass="pre">1)</span></code> denotes a
record with key <codeclass="docutils literal"><spanclass="pre">hello</span></code> and value <codeclass="docutils literal"><spanclass="pre">1</span></code>. To improve the readability of the semantics table you can assume that all records
are processed in timestamp order.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Key: word, value: count</span>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">KeyValueStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"aggregated-stream-store"</span><spanclass="cm">/* state store name */</span><spanclass="o">)</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
</pre></div>
</div>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>Impact of record caches</strong>:
For illustration purposes, the column “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” below shows the table’s state changes over time in a
very granular way. In practice, you would observe state changes in such a granular way only when
<aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">record caches</span></a> are disabled (default: enabled).
When record caches are enabled, what might happen for example is that the output results of the rows with timestamps
4 and 5 would be <aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">compacted</span></a>, and there would only be
a single state update for the key <codeclass="docutils literal"><spanclass="pre">kafka</span></code> in the KTable (here: from <codeclass="docutils literal"><spanclass="pre">(kafka</span><spanclass="pre">1)</span></code> directly to <codeclass="docutils literal"><spanclass="pre">(kafka,</span><spanclass="pre">3)</span></code>.
Typically, you should only disable record caches for testing or debugging purposes – under normal circumstances it
<p><strong>Example of semantics for table aggregations:</strong>
A <codeclass="docutils literal"><spanclass="pre">KGroupedTable</span></code>→<codeclass="docutils literal"><spanclass="pre">KTable</span></code> example is shown below. The tables are initially empty. Bold font is used in the column
for “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” to highlight changed state. An entry such as <codeclass="docutils literal"><spanclass="pre">(hello,</span><spanclass="pre">1)</span></code> denotes a record with key
<codeclass="docutils literal"><spanclass="pre">hello</span></code> and value <codeclass="docutils literal"><spanclass="pre">1</span></code>. To improve the readability of the semantics table you can assume that all records are processed
in timestamp order.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")</span>
<spanclass="n">Materialized</span><spanclass="o">.<</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">KeyValueStore</span><spanclass="o"><</span><spanclass="n">Bytes</span><spanclass="o">,</span><spanclass="kt">byte</span><spanclass="o">[]></span><spanclass="n">as</span><spanclass="o">(</span><spanclass="s">"aggregated-table-store"</span><spanclass="cm">/* state store name */</span><spanclass="o">)</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Integer</span><spanclass="o">());</span><spanclass="cm">/* serde for aggregate value */</span>
</pre></div>
</div>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>Impact of record caches</strong>:
For illustration purposes, the column “KTable <codeclass="docutils literal"><spanclass="pre">aggregated</span></code>” below shows the table’s state changes over time in a
very granular way. In practice, you would observe state changes in such a granular way only when
<aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">record caches</span></a> are disabled (default: enabled).
When record caches are enabled, what might happen for example is that the output results of the rows with timestamps
4 and 5 would be <aclass="reference internal"href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><spanclass="std std-ref">compacted</span></a>, and there would only be
a single state update for the key <codeclass="docutils literal"><spanclass="pre">kafka</span></code> in the KTable (here: from <codeclass="docutils literal"><spanclass="pre">(kafka</span><spanclass="pre">1)</span></code> directly to <codeclass="docutils literal"><spanclass="pre">(kafka,</span><spanclass="pre">3)</span></code>.
Typically, you should only disable record caches for testing or debugging purposes – under normal circumstances it
<spanid="streams-developer-guide-dsl-joins"></span><h4><aclass="toc-backref"href="#id13">Joining</a><aclass="headerlink"href="#joining"title="Permalink to this headline"></a></h4>
<pid="streams-developer-guide-dsl-joins-overview">Streams and tables can also be joined. Many stream processing applications in practice are coded as streaming joins.
For example, applications backing an online shop might need to access multiple, updating database tables (e.g. sales
prices, inventory, customer information) in order to enrich a new data record (e.g. customer transaction) with context
information. That is, scenarios where you need to perform table lookups at very large scale and with a low processing
latency. Here, a popular pattern is to make the information in the databases available in Kafka through so-called
<em>change data capture</em> in combination with <aclass="reference internal"href="../../connect/index.html#kafka-connect"><spanclass="std std-ref">Kafka’s Connect API</span></a>, and then implementing
applications that leverage the Streams API to perform very fast and efficient local joins
of such tables and streams, rather than requiring the application to make a query to a remote database over the network
for each record. In this example, the KTable concept in Kafka Streams would enable you to track the latest state
(e.g., snapshot) of each table in a local state store, thus greatly reducing the processing latency as well as
reducing the load of the remote databases when doing such streaming joins.</p>
<p>The following join operations are supported, see also the diagram in the
<aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateful-overview"><spanclass="std std-ref">overview section</span></a> of
Depending on the operands, joins are either <aclass="reference internal"href="#streams-developer-guide-dsl-windowing"><spanclass="std std-ref">windowed</span></a> joins or
<spanid="streams-developer-guide-dsl-joins-co-partitioning"></span><h5><aclass="toc-backref"href="#id14">Join co-partitioning requirements</a><aclass="headerlink"href="#join-co-partitioning-requirements"title="Permalink to this headline"></a></h5>
<p>Input data must be co-partitioned when joining. This ensures that input records with the same key, from both sides of the
join, are delivered to the same stream task during processing.
<strong>It is the responsibility of the user to ensure data co-partitioning when joining</strong>.</p>
<divclass="admonition tip">
<pclass="first admonition-title">Tip</p>
<pclass="last">If possible, consider using <aclass="reference internal"href="../concepts.html#streams-concepts-globalktable"><spanclass="std std-ref">global tables</span></a> (<codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code>) for joining because they do not require data co-partitioning.</p>
</div>
<p>The requirements for data co-partitioning are:</p>
<ulclass="simple">
<li>The input topics of the join (left side and right side) must have the <strong>same number of partitions</strong>.</li>
<li>All applications that <em>write</em> to the input topics must have the <strong>same partitioning strategy</strong> so that records with
the same key are delivered to same partition number. In other words, the keyspace of the input data must be
distributed across partitions in the same manner.
This means that, for example, applications that use Kafka’s <aclass="reference internal"href="../../clients/index.html#kafka-clients"><spanclass="std std-ref">Java Producer API</span></a> must use the
same partitioner (cf. the producer setting <codeclass="docutils literal"><spanclass="pre">"partitioner.class"</span></code> aka <codeclass="docutils literal"><spanclass="pre">ProducerConfig.PARTITIONER_CLASS_CONFIG</span></code>),
and applications that use the Kafka’s Streams API must use the same <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> for operations such as
<codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code>. The good news is that, if you happen to use the default partitioner-related settings across all
applications, you do not need to worry about the partitioning strategy.</li>
are performed based on the keys of records (e.g., <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>), it is required that the
input streams/tables of a join are co-partitioned by key.</p>
<p>The only exception are
<aclass="reference internal"href="#streams-developer-guide-dsl-joins-kstream-globalktable"><spanclass="std std-ref">KStream-GlobalKTable joins</span></a>. Here, co-partitioning is
it not required because <em>all</em> partitions of the <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code>‘s underlying changelog stream are made available to
each <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, i.e. each instance has a full copy of the changelog stream. Further, a
<codeclass="docutils literal"><spanclass="pre">KeyValueMapper</span></code> allows for non-key based joins from the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> to the <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code>.</p>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>Kafka Streams partly verifies the co-partitioning requirement:</strong>
During the partition assignment step, i.e. at runtime, Kafka Streams verifies whether the number of partitions for
both sides of a join are the same. If they are not, a <codeclass="docutils literal"><spanclass="pre">TopologyBuilderException</span></code> (runtime exception) is being
thrown. Note that Kafka Streams cannot verify whether the partitioning strategy matches between the input
streams/tables of a join – it is up to the user to ensure that this is the case.</p>
</div>
<p><strong>Ensuring data co-partitioning:</strong> If the inputs of a join are not co-partitioned yet, you must ensure this manually.
You may follow a procedure such as outlined below.</p>
<olclass="arabic">
<li><pclass="first">Identify the input KStream/KTable in the join whose underlying Kafka topic has the smaller number of partitions.
Let’s call this stream/table “SMALLER”, and the other side of the join “LARGER”. To learn about the number of
partitions of a Kafka topic you can use, for example, the CLI tool <codeclass="docutils literal"><spanclass="pre">bin/kafka-topics</span></code> with the <codeclass="docutils literal"><spanclass="pre">--describe</span></code>
option.</p>
</li>
<li><pclass="first">Pre-create a new Kafka topic for “SMALLER” that has the same number of partitions as “LARGER”. Let’s call this
new topic “repartitioned-topic-for-smaller”. Typically, you’d use the CLI tool <codeclass="docutils literal"><spanclass="pre">bin/kafka-topics</span></code> with the
<codeclass="docutils literal"><spanclass="pre">--create</span></code> option for this.</p>
</li>
<li><pclass="first">Within your application, re-write the data of “SMALLER” into the new Kafka topic. You must ensure that, when writing
the data with <codeclass="docutils literal"><spanclass="pre">to</span></code> or <codeclass="docutils literal"><spanclass="pre">through</span></code>, the same partitioner is used as for “LARGER”.</p>
<blockquote>
<div><ulclass="simple">
<li>If “SMALLER” is a KStream: <codeclass="docutils literal"><spanclass="pre">KStream#to("repartitioned-topic-for-smaller")</span></code>.</li>
<li>If “SMALLER” is a KTable: <codeclass="docutils literal"><spanclass="pre">KTable#to("repartitioned-topic-for-smaller")</span></code>.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">Within your application, re-read the data in “repartitioned-topic-for-smaller” into
a new KStream/KTable.</p>
<blockquote>
<div><ulclass="simple">
<li>If “SMALLER” is a KStream: <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream("repartitioned-topic-for-smaller")</span></code>.</li>
<li>If “SMALLER” is a KTable: <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#table("repartitioned-topic-for-smaller")</span></code>.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">Within your application, perform the join between “LARGER” and the new stream/table.</p>
</li>
</ol>
</div>
<divclass="section"id="kstream-kstream-join">
<spanid="streams-developer-guide-dsl-joins-kstream-kstream"></span><h5><aclass="toc-backref"href="#id15">KStream-KStream Join</a><aclass="headerlink"href="#kstream-kstream-join"title="Permalink to this headline"></a></h5>
<p>KStream-KStream joins are always <aclass="reference internal"href="#windowing-sliding"><spanclass="std std-ref">windowed</span></a> joins, because otherwise the size of the
internal state store used to perform the join – e.g., a <aclass="reference internal"href="#windowing-sliding"><spanclass="std std-ref">sliding window</span></a> or “buffer”– would
grow indefinitely. For stream-stream joins it’s important to highlight that a new input record on one side will
produce a join output <em>for each</em> matching record on the other side, and there can be <em>multiple</em> such matching records
in a given join window (cf. the row with timestamp 15 in the join semantics table below, for example).</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<td><pclass="first">Performs an INNER JOIN of this stream with another stream.
Even though this operation is windowed, the joined stream will be of type <codeclass="docutils literal"><spanclass="pre">KStream<K,</span><spanclass="pre">...></span></code> rather than <codeclass="docutils literal"><spanclass="pre">KStream<Windowed<K>,</span><spanclass="pre">...></span></code>.
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">join</span></code> exists, see the Javadocs for details.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">(),</span><spanclass="cm">/* left value */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Double</span><spanclass="o">())</span><spanclass="cm">/* right value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
timestamps are “close” to each other as defined by the user-supplied <codeclass="docutils literal"><spanclass="pre">JoinWindows</span></code>, i.e. the window defines an additional join predicate over the record timestamps.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<td><pclass="first">Performs a LEFT JOIN of this stream with another stream.
Even though this operation is windowed, the joined stream will be of type <codeclass="docutils literal"><spanclass="pre">KStream<K,</span><spanclass="pre">...></span></code> rather than <codeclass="docutils literal"><spanclass="pre">KStream<Windowed<K>,</span><spanclass="pre">...></span></code>.
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code> exists, see the Javadocs for details.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">(),</span><spanclass="cm">/* left value */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Double</span><spanclass="o">())</span><spanclass="cm">/* right value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
timestamps are “close” to each other as defined by the user-supplied <codeclass="docutils literal"><spanclass="pre">JoinWindows</span></code>, i.e. the window defines an additional join predicate over the record timestamps.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>;
this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the LEFT JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<td><pclass="first">Performs an OUTER JOIN of this stream with another stream.
Even though this operation is windowed, the joined stream will be of type <codeclass="docutils literal"><spanclass="pre">KStream<K,</span><spanclass="pre">...></span></code> rather than <codeclass="docutils literal"><spanclass="pre">KStream<Windowed<K>,</span><spanclass="pre">...></span></code>.
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">outerJoin</span></code> exists, see the Javadocs for details.</p>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">(),</span><spanclass="cm">/* left value */</span>
<spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Double</span><spanclass="o">())</span><spanclass="cm">/* right value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
timestamps are “close” to each other as defined by the user-supplied <codeclass="docutils literal"><spanclass="pre">JoinWindows</span></code>, i.e. the window defines an additional join predicate over the record timestamps.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on one side that does not have any match on the other side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code> or
<codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(null,</span><spanclass="pre">rightRecord.value)</span></code>, respectively; this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the OUTER JOIN column
(unlike LEFT JOIN, <codeclass="docutils literal"><spanclass="pre">[null,</span><spanclass="pre">x]</span></code> is possible, too, but no such example is shown in the table).</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of stream-stream joins:</strong>
The semantics of the various stream-stream join variants are explained below.
To improve the readability of the table, assume that (1) all records have the same key (and thus the key in the table is omitted), (2) all records belong to a single join window, and (3) all records are processed in timestamp order.
The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied
<aclass="reference external"href="../javadocs/org/apache/kafka/streams/kstream/ValueJoiner.html">ValueJoiner</a> for the <codeclass="docutils literal"><spanclass="pre">join</span></code>, <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>, and
<codeclass="docutils literal"><spanclass="pre">outerJoin</span></code> methods, respectively, whenever a new input record is received on either side of the join. An empty table
cell denotes that the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> is not called at all.</p>
<tableborder="1"class="docutils">
<colgroup>
<colwidth="8%"/>
<colwidth="13%"/>
<colwidth="13%"/>
<colwidth="22%"/>
<colwidth="22%"/>
<colwidth="22%"/>
</colgroup>
<theadvalign="bottom">
<trclass="row-odd"><thclass="head">Timestamp</th>
<thclass="head">Left (KStream)</th>
<thclass="head">Right (KStream)</th>
<thclass="head">(INNER) JOIN</th>
<thclass="head">LEFT JOIN</th>
<thclass="head">OUTER JOIN</th>
</tr>
</thead>
<tbodyvalign="top">
<trclass="row-even"><td>1</td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>2</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>3</td>
<td>A</td>
<td> </td>
<td> </td>
<td>[A, null]</td>
<td>[A, null]</td>
</tr>
<trclass="row-odd"><td>4</td>
<td> </td>
<td>a</td>
<td>[A, a]</td>
<td>[A, a]</td>
<td>[A, a]</td>
</tr>
<trclass="row-even"><td>5</td>
<td>B</td>
<td> </td>
<td>[B, a]</td>
<td>[B, a]</td>
<td>[B, a]</td>
</tr>
<trclass="row-odd"><td>6</td>
<td> </td>
<td>b</td>
<td>[A, b], [B, b]</td>
<td>[A, b], [B, b]</td>
<td>[A, b], [B, b]</td>
</tr>
<trclass="row-even"><td>7</td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>8</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>9</td>
<td>C</td>
<td> </td>
<td>[C, a], [C, b]</td>
<td>[C, a], [C, b]</td>
<td>[C, a], [C, b]</td>
</tr>
<trclass="row-odd"><td>10</td>
<td> </td>
<td>c</td>
<td>[A, c], [B, c], [C, c]</td>
<td>[A, c], [B, c], [C, c]</td>
<td>[A, c], [B, c], [C, c]</td>
</tr>
<trclass="row-even"><td>11</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>12</td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>13</td>
<td> </td>
<td>null</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>14</td>
<td> </td>
<td>d</td>
<td>[A, d], [B, d], [C, d]</td>
<td>[A, d], [B, d], [C, d]</td>
<td>[A, d], [B, d], [C, d]</td>
</tr>
<trclass="row-even"><td>15</td>
<td>D</td>
<td> </td>
<td>[D, a], [D, b], [D, c], [D, d]</td>
<td>[D, a], [D, b], [D, c], [D, d]</td>
<td>[D, a], [D, b], [D, c], [D, d]</td>
</tr>
</tbody>
</table>
</div>
<divclass="section"id="ktable-ktable-join">
<spanid="streams-developer-guide-dsl-joins-ktable-ktable"></span><h5><aclass="toc-backref"href="#id16">KTable-KTable Join</a><aclass="headerlink"href="#ktable-ktable-join"title="Permalink to this headline"></a></h5>
<p>KTable-KTable joins are always <em>non-windowed</em> joins. They are designed to be consistent with their counterparts in
relational databases. The changelog streams of both KTables are materialized into local state stores to represent the
latest snapshot of their <aclass="reference internal"href="../concepts.html#streams-concepts-ktable"><spanclass="std std-ref">table duals</span></a>.
The join result is a new KTable that represents the changelog stream of the join operation.</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key are ignored and do not trigger the join.</li>
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not
trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding
key actually exists already in the join result KTable).</li>
</ul>
</div></blockquote>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key are ignored and do not trigger the join.</li>
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not
trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding
key actually exists already in the join result KTable).</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>;
this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the LEFT JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key are ignored and do not trigger the join.</li>
<li>Input records with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not
trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding
key actually exists already in the join result KTable).</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on one side that does not have any match on the other side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code> or
<codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(null,</span><spanclass="pre">rightRecord.value)</span></code>, respectively; this explains the rows with timestamp=3 and timestamp=7 in the table below, which list <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> and
<codeclass="docutils literal"><spanclass="pre">[null,</span><spanclass="pre">b]</span></code>, respectively, in the OUTER JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of table-table joins:</strong>
The semantics of the various table-table join variants are explained below.
To improve the readability of the table, you can assume that (1) all records have the same key (and thus the key in the table is omitted) and that (2) all records are processed in timestamp order.
The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied
<aclass="reference external"href="../javadocs/org/apache/kafka/streams/kstream/ValueJoiner.html">ValueJoiner</a> for the <codeclass="docutils literal"><spanclass="pre">join</span></code>, <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>, and
<codeclass="docutils literal"><spanclass="pre">outerJoin</span></code> methods, respectively, whenever a new input record is received on either side of the join. An empty table
cell denotes that the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> is not called at all.</p>
<tableborder="1"class="docutils">
<colgroup>
<colwidth="8%"/>
<colwidth="13%"/>
<colwidth="13%"/>
<colwidth="22%"/>
<colwidth="22%"/>
<colwidth="22%"/>
</colgroup>
<theadvalign="bottom">
<trclass="row-odd"><thclass="head">Timestamp</th>
<thclass="head">Left (KTable)</th>
<thclass="head">Right (KTable)</th>
<thclass="head">(INNER) JOIN</th>
<thclass="head">LEFT JOIN</th>
<thclass="head">OUTER JOIN</th>
</tr>
</thead>
<tbodyvalign="top">
<trclass="row-even"><td>1</td>
<td>null (tombstone)</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>2</td>
<td> </td>
<td>null (tombstone)</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-even"><td>3</td>
<td>A</td>
<td> </td>
<td> </td>
<td>[A, null]</td>
<td>[A, null]</td>
</tr>
<trclass="row-odd"><td>4</td>
<td> </td>
<td>a</td>
<td>[A, a]</td>
<td>[A, a]</td>
<td>[A, a]</td>
</tr>
<trclass="row-even"><td>5</td>
<td>B</td>
<td> </td>
<td>[B, a]</td>
<td>[B, a]</td>
<td>[B, a]</td>
</tr>
<trclass="row-odd"><td>6</td>
<td> </td>
<td>b</td>
<td>[B, b]</td>
<td>[B, b]</td>
<td>[B, b]</td>
</tr>
<trclass="row-even"><td>7</td>
<td>null (tombstone)</td>
<td> </td>
<td>null (tombstone)</td>
<td>null (tombstone)</td>
<td>[null, b]</td>
</tr>
<trclass="row-odd"><td>8</td>
<td> </td>
<td>null (tombstone)</td>
<td> </td>
<td> </td>
<td>null (tombstone)</td>
</tr>
<trclass="row-even"><td>9</td>
<td>C</td>
<td> </td>
<td> </td>
<td>[C, null]</td>
<td>[C, null]</td>
</tr>
<trclass="row-odd"><td>10</td>
<td> </td>
<td>c</td>
<td>[C, c]</td>
<td>[C, c]</td>
<td>[C, c]</td>
</tr>
<trclass="row-even"><td>11</td>
<td> </td>
<td>null (tombstone)</td>
<td>null (tombstone)</td>
<td>[C, null]</td>
<td>[C, null]</td>
</tr>
<trclass="row-odd"><td>12</td>
<td>null (tombstone)</td>
<td> </td>
<td> </td>
<td>null (tombstone)</td>
<td>null (tombstone)</td>
</tr>
<trclass="row-even"><td>13</td>
<td> </td>
<td>null (tombstone)</td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<trclass="row-odd"><td>14</td>
<td> </td>
<td>d</td>
<td> </td>
<td> </td>
<td>[null, d]</td>
</tr>
<trclass="row-even"><td>15</td>
<td>D</td>
<td> </td>
<td>[D, d]</td>
<td>[D, d]</td>
<td>[D, d]</td>
</tr>
</tbody>
</table>
</div>
<divclass="section"id="kstream-ktable-join">
<spanid="streams-developer-guide-dsl-joins-kstream-ktable"></span><h5><aclass="toc-backref"href="#id17">KStream-KTable Join</a><aclass="headerlink"href="#kstream-ktable-join"title="Permalink to this headline"></a></h5>
<p>KStream-KTable joins are always <em>non-windowed</em> joins. They allow you to perform <em>table lookups</em> against a KTable
(changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be to enrich
a stream of user activities (KStream) with the latest user profile information (KTable).</p>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">join</span></code> exists, see the Javadocs for details.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table.
Tombstones do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<p>Several variants of <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code> exists, see the Javadocs for details.</p>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">.</span><spanclass="na">withValueSerde</span><spanclass="o">(</span><spanclass="n">Serdes</span><spanclass="o">.</span><spanclass="na">Long</span><spanclass="o">())</span><spanclass="cm">/* left value */</span>
<spanclass="o">);</span>
</pre></div>
</div>
<p>Detailed behavior:</p>
<ul>
<li><pclass="first">The join is <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">leftRecord.key</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table.
Tombstones do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>;
this explains the row with timestamp=3 in the table below, which lists <codeclass="docutils literal"><spanclass="pre">[A,</span><spanclass="pre">null]</span></code> in the LEFT JOIN column.</p>
</li>
</ul>
<pclass="last">See the semantics overview at the bottom of this section for a detailed description.</p>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of stream-table joins:</strong>
The semantics of the various stream-table join variants are explained below.
To improve the readability of the table we assume that (1) all records have the same key (and thus we omit the key in
the table) and that (2) all records are processed in timestamp order.
The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied
<aclass="reference external"href="../javadocs/org/apache/kafka/streams/kstream/ValueJoiner.html">ValueJoiner</a> for the <codeclass="docutils literal"><spanclass="pre">join</span></code> and <codeclass="docutils literal"><spanclass="pre">leftJoin</span></code>
methods, respectively, whenever a new input record is received on either side of the join. An empty table
cell denotes that the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> is not called at all.</p>
<spanid="streams-developer-guide-dsl-joins-kstream-globalktable"></span><h5><aclass="toc-backref"href="#id18">KStream-GlobalKTable Join</a><aclass="headerlink"href="#kstream-globalktable-join"title="Permalink to this headline"></a></h5>
<p>KStream-GlobalKTable joins are always <em>non-windowed</em> joins. They allow you to perform <em>table lookups</em> against a
<aclass="reference internal"href="../concepts.html#streams-concepts-globalktable"><spanclass="std std-ref">GlobalKTable</span></a> (entire changelog stream) upon receiving a new record from the
KStream (record stream). An example use case would be “star queries” or “star joins”, where you would enrich a stream
of user activities (KStream) with the latest user profile information (GlobalKTable) and further context information
(further GlobalKTables).</p>
<p>At a high-level, KStream-GlobalKTable joins are very similar to
<aclass="reference internal"href="#streams-developer-guide-dsl-joins-kstream-ktable"><spanclass="std std-ref">KStream-KTable joins</span></a>. However, global tables provide you
with much more flexibility at the <aclass="reference internal"href="../concepts.html#streams-concepts-globalktable"><spanclass="std std-ref">some expense</span></a> when compared to partitioned
tables:</p>
<ulclass="simple">
<li>They do not require <aclass="reference internal"href="#streams-developer-guide-dsl-joins-co-partitioning"><spanclass="std std-ref">data co-partitioning</span></a>.</li>
<li>They allow for efficient “star joins”; i.e., joining a large-scale “facts” stream against “dimension” tables</li>
<li>They allow for joining against foreign keys; i.e., you can lookup data in the table not just by the keys of records in the
stream, but also by data in the record values.</li>
<li>They make many use cases feasible where you must work on heavily skewed data and thus suffer from hot partitions.</li>
<li>They are often more efficient than their partitioned KTable counterpart when you need to perform multiple joins in
succession.</li>
</ul>
<p>Join output records are effectively created as follows, leveraging the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code>:</p>
<p>The <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code> is fully bootstrapped upon (re)start of a <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, which means the table is fully populated with all the data in the underlying topic that is
available at the time of the startup. The actual data processing begins only once the bootstrapping has completed.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<spanclass="o">(</span><spanclass="n">leftKey</span><spanclass="o">,</span><spanclass="n">leftValue</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">leftKey</span><spanclass="o">.</span><spanclass="na">length</span><spanclass="o">(),</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<spanclass="k">new</span><spanclass="n">KeyValueMapper</span><spanclass="o"><</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">Integer</span><spanclass="o">>()</span><spanclass="o">{</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<li><pclass="first">The join is indirectly <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">KeyValueMapper#apply(leftRecord.key,</span><spanclass="pre">leftRecord.value)</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em>, which indicate the deletion of a record key from the table. Tombstones do not trigger the
<p>The <codeclass="docutils literal"><spanclass="pre">GlobalKTable</span></code> is fully bootstrapped upon (re)start of a <codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instance, which means the table is fully populated with all the data in the underlying topic that is
available at the time of the startup. The actual data processing begins only once the bootstrapping has completed.</p>
<p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
<spanclass="o">(</span><spanclass="n">leftKey</span><spanclass="o">,</span><spanclass="n">leftValue</span><spanclass="o">)</span><spanclass="o">-></span><spanclass="n">leftKey</span><spanclass="o">.</span><spanclass="na">length</span><spanclass="o">(),</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<spanclass="k">new</span><spanclass="n">KeyValueMapper</span><spanclass="o"><</span><spanclass="n">String</span><spanclass="o">,</span><spanclass="n">Long</span><spanclass="o">,</span><spanclass="n">Integer</span><spanclass="o">>()</span><spanclass="o">{</span><spanclass="cm">/* derive a (potentially) new key by which to lookup against the table */</span>
<li><pclass="first">The join is indirectly <em>key-based</em>, i.e. with the join predicate <codeclass="docutils literal"><spanclass="pre">KeyValueMapper#apply(leftRecord.key,</span><spanclass="pre">leftRecord.value)</span><spanclass="pre">==</span><spanclass="pre">rightRecord.key</span></code>.</p>
</li>
<li><pclass="first">The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called to produce
join output records.</p>
<blockquote>
<div><ulclass="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <codeclass="docutils literal"><spanclass="pre">null</span></code> key or a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <codeclass="docutils literal"><spanclass="pre">null</span></code> value are interpreted as <em>tombstones</em>, which indicate the deletion of a record key from the table. Tombstones do not trigger the
join.</li>
</ul>
</div></blockquote>
</li>
<li><pclass="first">For each input record on the left side that does not have any match on the right side, the <codeclass="docutils literal"><spanclass="pre">ValueJoiner</span></code> will be called with <codeclass="docutils literal"><spanclass="pre">ValueJoiner#apply(leftRecord.value,</span><spanclass="pre">null)</span></code>.</p>
</li>
</ul>
</td>
</tr>
</tbody>
</table>
<p><strong>Semantics of stream-table joins:</strong>
The join semantics are identical to <aclass="reference internal"href="#streams-developer-guide-dsl-joins-kstream-ktable"><spanclass="std std-ref">KStream-KTable joins</span></a>.
The only difference is that, for KStream-GlobalKTable joins, the left input record is first “mapped” with
a user-supplied <codeclass="docutils literal"><spanclass="pre">KeyValueMapper</span></code> into the table’s keyspace prior to the table lookup.</p>
</div>
</div>
<divclass="section"id="windowing">
<spanid="streams-developer-guide-dsl-windowing"></span><h4><aclass="toc-backref"href="#id19">Windowing</a><aclass="headerlink"href="#windowing"title="Permalink to this headline"></a></h4>
<p>Windowing lets you control how to group records that have the same key for stateful operations such as
<aclass="reference internal"href="#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a> or <aclass="reference internal"href="#streams-developer-guide-dsl-joins"><spanclass="std std-ref">joins</span></a> into
so-called windows. Windows are tracked per record key.</p>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last">A related operation is <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">grouping</span></a>, which groups all
records that have the same key to ensure that data is properly partitioned (“keyed”) for subsequent operations.
Once grouped, windowing allows you to further sub-group the records of a key.</p>
</div>
<p>For example, in join operations, a windowing state store is used to store all the records received so far within the
defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation
results per window.
Old records in the state store are purged after the specified
Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be
changed via <codeclass="docutils literal"><spanclass="pre">Windows#until()</span></code> and <codeclass="docutils literal"><spanclass="pre">SessionWindows#until()</span></code>.</p>
<p>The DSL supports the following types of windows:</p>
<spanid="windowing-tumbling"></span><h5><aclass="toc-backref"href="#id20">Tumbling time windows</a><aclass="headerlink"href="#tumbling-time-windows"title="Permalink to this headline"></a></h5>
<p>Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time
intervals. They model fixed-size, non-overlapping, gap-less windows.
A tumbling window is defined by a single property: the window’s <em>size</em>.
A tumbling window is a hopping window whose window size is equal to its advance interval.
Since tumbling windows never overlap, a data record will belong to one and only one window.</p>
<pclass="caption"><spanclass="caption-text">This diagram shows windowing a stream of data records with tumbling windows. Windows do not overlap because, by
definition, the advance interval is identical to the window size. In this diagram the time numbers represent minutes;
e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means
the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would
become t=300,000).</span></p>
</div>
<p>Tumbling time windows are <em>aligned to the epoch</em>, with the lower interval bound being inclusive and the upper bound
being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, tumbling
windows with a size of 5000ms have predictable window boundaries <codeclass="docutils literal"><spanclass="pre">[0;5000),[5000;10000),...</span></code>— and <strong>not</strong>
<codeclass="docutils literal"><spanclass="pre">[1000;6000),[6000;11000),...</span></code> or even something “random” like <codeclass="docutils literal"><spanclass="pre">[1452;6452),[6452;11452),...</span></code>.</p>
<p>The following code defines a tumbling window with a size of 5 minutes:</p>
<spanid="windowing-hopping"></span><h5><aclass="toc-backref"href="#id21">Hopping time windows</a><aclass="headerlink"href="#hopping-time-windows"title="Permalink to this headline"></a></h5>
<p>Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows.
A hopping window is defined by two properties: the window’s <em>size</em> and its <em>advance interval</em> (aka “hop”). The advance
interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a
hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap – and in
general they do – a data record may belong to more than one such windows.</p>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>Hopping windows vs. sliding windows:</strong>
Hopping windows are sometimes called “sliding windows” in other stream processing tools. Kafka Streams follows the
terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows.</p>
</div>
<p>The following code defines a hopping window with a size of 5 minutes and an advance interval of 1 minute:</p>
<pclass="caption"><spanclass="caption-text">This diagram shows windowing a stream of data records with hopping windows. In this diagram the time numbers
represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is
milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to
milliseconds (e.g. t=5 would become t=300,000).</span></p>
</div>
<p>Hopping time windows are <em>aligned to the epoch</em>, with the lower interval bound being inclusive and the upper bound
being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping
windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries
<codeclass="docutils literal"><spanclass="pre">[0;5000),[3000;8000),...</span></code>— and <strong>not</strong><codeclass="docutils literal"><spanclass="pre">[1000;6000),[4000;9000),...</span></code> or even something “random” like
<p>Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a <em>windowed KTable</em> whose keys
type is <codeclass="docutils literal"><spanclass="pre">Windowed<K></span></code>. This is to differentiate aggregate values with the same key from different windows. The
corresponding window instance and the embedded key can be retrieved as <codeclass="docutils literal"><spanclass="pre">Windowed#window()</span></code> and <codeclass="docutils literal"><spanclass="pre">Windowed#key()</span></code>,
respectively.</p>
</div>
<divclass="section"id="sliding-time-windows">
<spanid="windowing-sliding"></span><h5><aclass="toc-backref"href="#id22">Sliding time windows</a><aclass="headerlink"href="#sliding-time-windows"title="Permalink to this headline"></a></h5>
<p>Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows
are used only for <aclass="reference internal"href="#streams-developer-guide-dsl-joins"><spanclass="std std-ref">join operations</span></a>, and can be specified through the
<p>A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are
said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is
within the window size. Thus, sliding windows are not aligned to the epoch, but to the data record timestamps. In
contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are
<em>both inclusive</em>.</p>
</div>
<divclass="section"id="session-windows">
<spanid="windowing-session"></span><h5><aclass="toc-backref"href="#id23">Session Windows</a><aclass="headerlink"href="#session-windows"title="Permalink to this headline"></a></h5>
<p>Session windows are used to aggregate key-based events into so-called <em>sessions</em>, the process of which is referred to
as <em>sessionization</em>. Sessions represent a <strong>period of activity</strong> separated by a defined <strong>gap of inactivity</strong> (or
“idleness”). Any events processed that fall within the inactivity gap of any existing sessions are merged into the
existing sessions. If an event falls outside of the session gap, then a new session will be created.</p>
<p>Session windows are different from the other window types in that:</p>
<ulclass="simple">
<li>all windows are tracked independently across keys – e.g. windows of different keys typically have different start
and end times</li>
<li>their window sizes sizes vary – even windows for the same key typically have different sizes</li>
</ul>
<p>The prime area of application for session windows is <strong>user behavior analysis</strong>. Session-based analyses can range from
simple metrics (e.g. count of user visits on a news website or social platform) to more complex metrics (e.g. customer
conversion funnel and event flows).</p>
<p>The following code defines a session window with an inactivity gap of 5 minutes:</p>
<pclass="caption"><spanclass="caption-text">Detected sessions after having received three input records: two records for the green record key at t=0 and t=6, and
one record for the blue record key at t=2.
In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit
of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to
convert from minutes to milliseconds (e.g. t=5 would become t=300,000).</span></p>
</div>
<p>If we then receive three additional records (including two late-arriving records), what would happen is that the two
existing sessions for the green record key will be merged into a single session starting at time 0 and ending at time 6,
consisting of a total of three records. The existing session for the blue record key will be extended to end at time 5,
consisting of a total of two records. And, finally, there will be a new session for the blue key starting and ending at
<pclass="caption"><spanclass="caption-text">Detected sessions after having received six input records. Note the two late-arriving data records at t=4 (green) and
t=5 (blue), which lead to a merge of sessions and an extension of a session, respectively.</span></p>
<spanid="streams-developer-guide-dsl-process"></span><h3><aclass="toc-backref"href="#id24">Applying processors and transformers (Processor API integration)</a><aclass="headerlink"href="#applying-processors-and-transformers-processor-api-integration"title="Permalink to this headline"></a></h3>
<p>Beyond the aforementioned <aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">stateless</span></a> and
<aclass="reference internal"href="#streams-developer-guide-dsl-transformations-stateless"><spanclass="std std-ref">stateful</span></a> transformations, you may also
leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
There are a number of scenarios where this may be helpful:</p>
<ulclass="simple">
<li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet available in the DSL.</li>
<li><strong>Combining ease-of-use with full flexibility where it’s needed:</strong> Even though you generally prefer to use
the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and
tinkering than the DSL provides. For example, only the Processor API provides access to a
<aclass="reference internal"href="../faq.html#streams-faq-processing-record-metadata"><spanclass="std std-ref">record’s metadata</span></a> such as its topic, partition, and offset information.
However, you don’t want to switch completely to the Processor API just because of that.</li>
<li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies that provide an
imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to
<td><pclass="first"><strong>Terminal operation.</strong> Applies a <codeclass="docutils literal"><spanclass="pre">Processor</span></code> to each record.
<codeclass="docutils literal"><spanclass="pre">process()</span></code> allows you to leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
<p>This is essentially equivalent to adding the <codeclass="docutils literal"><spanclass="pre">Processor</span></code> via <codeclass="docutils literal"><spanclass="pre">Topology#addProcessor()</span></code> to your
<td><pclass="first">Applies a <codeclass="docutils literal"><spanclass="pre">Transformer</span></code> to each record.
<codeclass="docutils literal"><spanclass="pre">transform()</span></code> allows you to leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
<p>Each input record is transformed into zero, one, or more output records (similar to the stateless <codeclass="docutils literal"><spanclass="pre">flatMap</span></code>).
The <codeclass="docutils literal"><spanclass="pre">Transformer</span></code> must return <codeclass="docutils literal"><spanclass="pre">null</span></code> for zero output.
You can modify the record’s key and value, including their types.</p>
<p><strong>Marks the stream for data re-partitioning:</strong>
Applying a grouping or a join after <codeclass="docutils literal"><spanclass="pre">transform</span></code> will result in re-partitioning of the records.
If possible use <codeclass="docutils literal"><spanclass="pre">transformValues</span></code> instead, which will not cause data re-partitioning.</p>
<p><codeclass="docutils literal"><spanclass="pre">transform</span></code> is essentially equivalent to adding the <codeclass="docutils literal"><spanclass="pre">Transformer</span></code> via <codeclass="docutils literal"><spanclass="pre">Topology#addProcessor()</span></code> to your
<td><pclass="first">Applies a <codeclass="docutils literal"><spanclass="pre">ValueTransformer</span></code> to each record, while retaining the key of the original record.
<codeclass="docutils literal"><spanclass="pre">transformValues()</span></code> allows you to leverage the <aclass="reference internal"href="processor-api.html#streams-developer-guide-processor-api"><spanclass="std std-ref">Processor API</span></a> from the DSL.
<p>Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible).
The <codeclass="docutils literal"><spanclass="pre">ValueTransformer</span></code> may return <codeclass="docutils literal"><spanclass="pre">null</span></code> as the new value for a record.</p>
<p><codeclass="docutils literal"><spanclass="pre">transformValues</span></code> is preferable to <codeclass="docutils literal"><spanclass="pre">transform</span></code> because it will not cause data re-partitioning.</p>
<p><codeclass="docutils literal"><spanclass="pre">transformValues</span></code> is essentially equivalent to adding the <codeclass="docutils literal"><spanclass="pre">ValueTransformer</span></code> via <codeclass="docutils literal"><spanclass="pre">Topology#addProcessor()</span></code> to your
<p>The following example shows how to leverage, via the <codeclass="docutils literal"><spanclass="pre">KStream#process()</span></code> method, a custom <codeclass="docutils literal"><spanclass="pre">Processor</span></code> that sends an
email notification whenever a page view count reaches a predefined threshold.</p>
<p>First, we need to implement a custom stream processor, <codeclass="docutils literal"><spanclass="pre">PopularPageEmailAlert</span></code>, that implements the <codeclass="docutils literal"><spanclass="pre">Processor</span></code>
interface:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// A processor that sends an alert message about a popular page to a configurable email address</span>
<spanclass="c1">// Any code for clean up would go here. This processor instance will not be used again after this call.</span>
<spanclass="o">}</span>
<spanclass="o">}</span>
</pre></div>
</div>
<divclass="admonition tip">
<pclass="first admonition-title">Tip</p>
<pclass="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by
calling <codeclass="docutils literal"><spanclass="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the
corresponding <codeclass="docutils literal"><spanclass="pre">KStream#process()</span></code> method call (note that this is a different method than <codeclass="docutils literal"><spanclass="pre">Processor#process()</span></code>),
plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only
allow for read-only access.</p>
</div>
<p>Then we can leverage the <codeclass="docutils literal"><spanclass="pre">PopularPageEmailAlert</span></code> processor in the DSL via <codeclass="docutils literal"><spanclass="pre">KStream#process</span></code>.</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Send an email notification when the view count of a page reaches one thousand.</span>
<spanid="streams-developer-guide-dsl-destinations"></span><h2><aclass="toc-backref"href="#id25">Writing streams back to Kafka</a><aclass="headerlink"href="#writing-streams-back-to-kafka"title="Permalink to this headline"></a></h2>
<p>Any streams and tables may be (continuously) written back to a Kafka topic. As we will describe in more detail below, the output data might be
re-partitioned on its way to Kafka, depending on the situation.</p>
<li>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</li>
<li>You <strong>must specify SerDes explicitly</strong> via the <codeclass="docutils literal"><spanclass="pre">Produced</span></code> class if the key and/or value types of the
<codeclass="docutils literal"><spanclass="pre">KStream</span></code> do not match the configured default SerDes.</li>
<li>See <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a> for information about configuring default SerDes, available SerDes,
and implementing your own custom SerDes.</li>
</ul>
<p>A variant of <codeclass="docutils literal"><spanclass="pre">to</span></code> exists that enables you to specify how the data is produced by using a <codeclass="docutils literal"><spanclass="pre">Produced</span></code>
instance to specify, for example, a <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> that gives you control over
how output records are distributed across the partitions of the output topic.</p>
<p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
<olclass="last arabic simple">
<li>If the output topic has a different number of partitions than the stream/table.</li>
<li>If the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> was marked for re-partitioning.</li>
<li>If you provide a custom <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> to explicitly control how to distribute the output records
across the partitions of the output topic.</li>
<li>If the key of an output record is <codeclass="docutils literal"><spanclass="pre">null</span></code>.</li>
<td><pclass="first">Write the records to a Kafka topic and create a new stream/table from that topic.
Essentially a shorthand for <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> followed by <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code>, same for tables.
<li>If you do not specify SerDes explicitly, the default SerDes from the
<aclass="reference internal"href="config-streams.html#streams-developer-guide-configuration"><spanclass="std std-ref">configuration</span></a> are used.</li>
<li>You <strong>must specify SerDes explicitly</strong> if the key and/or value types of the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> or <codeclass="docutils literal"><spanclass="pre">KTable</span></code> do not
match the configured default SerDes.</li>
<li>See <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data Types and Serialization</span></a> for information about configuring default SerDes, available SerDes,
and implementing your own custom SerDes.</li>
</ul>
<p>A variant of <codeclass="docutils literal"><spanclass="pre">through</span></code> exists that enables you to specify how the data is produced by using a <codeclass="docutils literal"><spanclass="pre">Produced</span></code>
instance to specify, for example, a <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> that gives you control over
how output records are distributed across the partitions of the output topic.</p>
<p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
<olclass="last arabic simple">
<li>If the output topic has a different number of partitions than the stream/table.</li>
<li>If the <codeclass="docutils literal"><spanclass="pre">KStream</span></code> was marked for re-partitioning.</li>
<li>If you provide a custom <codeclass="docutils literal"><spanclass="pre">StreamPartitioner</span></code> to explicitly control how to distribute the output records
across the partitions of the output topic.</li>
<li>If the key of an output record is <codeclass="docutils literal"><spanclass="pre">null</span></code>.</li>
</ol>
</td>
</tr>
</tbody>
</table>
<divclass="admonition note">
<pclass="first admonition-title">Note</p>
<pclass="last"><strong>When you want to write to systems other than Kafka:</strong>
Besides writing the data back to Kafka, you can also apply a
<aclass="reference internal"href="#streams-developer-guide-dsl-process"><spanclass="std std-ref">custom processor</span></a> as a stream sink at the end of the processing to, for
example, write to external databases. First, doing so is not a recommended pattern – we strongly suggest to use the
<aclass="reference internal"href="../../connect/index.html#kafka-connect"><spanclass="std std-ref">Kafka Connect API</span></a> instead. However, if you do use such a sink processor, please be aware that
it is now your responsibility to guarantee message delivery semantics when talking to such external systems (e.g., to
retry on delivery failure or to prevent message duplication).</p>