<spanid="streams-developer-guide-processor-api"></span><h1>Processor API<aclass="headerlink"href="#processor-api"title="Permalink to this headline"></a></h1>
<p>The Processor API allows developers to define and connect custom processors and to interact with state stores. With the
Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these
processors with their associated state stores to compose the processor topology that represents a customized processing
logic.</p>
<divclass="contents local topic"id="table-of-contents">
<li><aclass="reference internal"href="#defining-and-creating-a-state-store"id="id4">Defining and creating a State Store</a></li>
<li><aclass="reference internal"href="#fault-tolerant-state-stores"id="id5">Fault-tolerant State Stores</a></li>
<li><aclass="reference internal"href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs"id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
<li><aclass="reference internal"href="#implementing-custom-state-stores"id="id7">Implementing Custom State Stores</a></li>
<h2><aclass="toc-backref"href="#id1">Overview</a><aclass="headerlink"href="#overview"title="Permalink to this headline"></a></h2>
<p>The Processor API can be used to implement both <strong>stateless</strong> as well as <strong>stateful</strong> operations, where the latter is
achieved through the use of <aclass="reference internal"href="#streams-developer-guide-state-store"><spanclass="std std-ref">state stores</span></a>.</p>
<pclass="last"><strong>Combining the DSL and the Processor API:</strong>
You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the
section <aclass="reference internal"href="dsl-api.html#streams-developer-guide-dsl-process"><spanclass="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p>
<p>For a complete list of available API functionality, see the <ahref="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p>
<spanid="streams-developer-guide-stream-processor"></span><h2><aclass="toc-backref"href="#id2">Defining a Stream Processor</a><aclass="headerlink"href="#defining-a-stream-processor"title="Permalink to this headline"></a></h2>
<p>A <aclass="reference internal"href="../core-concepts.html#streams_processor_node"><spanclass="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step.
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect
these processors with their associated state stores to compose the processor topology.</p>
<p>You can define a customized stream processor by implementing the <codeclass="docutils literal"><spanclass="pre">Processor</span></code> interface, which provides the <codeclass="docutils literal"><spanclass="pre">process()</span></code> API method.
The <codeclass="docutils literal"><spanclass="pre">process()</span></code> method is called on each of the received records.</p>
<p>The <codeclass="docutils literal"><spanclass="pre">Processor</span></code> interface also has an <codeclass="docutils literal"><spanclass="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction
phase. Processor instances should perform any required initialization in this method. The <codeclass="docutils literal"><spanclass="pre">init()</span></code> method passes in a <codeclass="docutils literal"><spanclass="pre">ProcessorContext</span></code>
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
function (via <codeclass="docutils literal"><spanclass="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <codeclass="docutils literal"><spanclass="pre">ProcessorContext#forward()</span></code>),
and to commit the current processing progress (via <codeclass="docutils literal"><spanclass="pre">ProcessorContext#commit()</span></code>).
Any resources you set up in <codeclass="docutils literal"><spanclass="pre">init()</span></code> can be cleaned up in the
<codeclass="docutils literal"><spanclass="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
<codeclass="docutils literal"><spanclass="pre">Processor</span></code> object by calling
<codeclass="docutils literal"><spanclass="pre">init()</span></code> on it again after <codeclass="docutils literal"><spanclass="pre">close()</span></code>.</p>
<p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
(1) If <codeclass="docutils literal"><spanclass="pre">#forward()</span></code> is called within <codeclass="docutils literal"><spanclass="pre">#process()</span></code> the output record inherits the input record timestamp.
(2) If <codeclass="docutils literal"><spanclass="pre">#forward()</span></code> is called within <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
Note, that <codeclass="docutils literal"><spanclass="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
<p>Specifically, <codeclass="docutils literal"><spanclass="pre">ProcessorContext#schedule()</span></code> accepts a user <codeclass="docutils literal"><spanclass="pre">Punctuator</span></code> callback interface, which triggers its <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code>
API method periodically based on the <codeclass="docutils literal"><spanclass="pre">PunctuationType</span></code>. The <codeclass="docutils literal"><spanclass="pre">PunctuationType</span></code> determines what notion of time is used
for the punctuation scheduling: either <aclass="reference internal"href="../core-concepts.html#streams_time"><spanclass="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
is configured to represent event-time via <codeclass="docutils literal"><spanclass="pre">TimestampExtractor</span></code>). When stream-time is used, <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> is triggered purely
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
is no new input data arriving, stream-time is not advanced and thus <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> is not called.</p>
<p>For example, if you schedule a <codeclass="docutils literal"><spanclass="pre">Punctuator</span></code> function every 10 seconds based on <codeclass="docutils literal"><spanclass="pre">PunctuationType.STREAM_TIME</span></code> and if you
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
then <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code>
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p>
<p>When wall-clock-time (i.e. <codeclass="docutils literal"><spanclass="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> is triggered purely by the wall-clock time.
Reusing the example above, if the <codeclass="docutils literal"><spanclass="pre">Punctuator</span></code> function is scheduled based on <codeclass="docutils literal"><spanclass="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
60 records were processed within 20 seconds, <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records
were processed within 5 seconds, then no <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <codeclass="docutils literal"><spanclass="pre">Punctuator</span></code>
callbacks with different <codeclass="docutils literal"><spanclass="pre">PunctuationType</span></code> types within the same processor by calling <codeclass="docutils literal"><spanclass="pre">ProcessorContext#schedule()</span></code> multiple
times inside <codeclass="docutils literal"><spanclass="pre">init()</span></code> method.</p>
<pclass="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
If at least one partition does not have any new data available, stream-time will not be advanced and thus <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> will not be triggered if <codeclass="docutils literal"><spanclass="pre">PunctuationType.STREAM_TIME</span></code> was specified.
This behavior is independent of the configured timestamp extractor, i.e., using <codeclass="docutils literal"><spanclass="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code>.</p>
<p>The following example <codeclass="docutils literal"><spanclass="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
<ulclass="simple">
<li>In the <codeclass="docutils literal"><spanclass="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li>
<li>In the <codeclass="docutils literal"><spanclass="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
<li>In the <codeclass="docutils literal"><spanclass="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
<pclass="last"><strong>Stateful processing with state stores:</strong>
The <codeclass="docutils literal"><spanclass="pre">WordCountProcessor</span></code> defined above can access the currently received record in its <codeclass="docutils literal"><spanclass="pre">process()</span></code> method, and it can
leverage <aclass="reference internal"href="#streams-developer-guide-state-store"><spanclass="std std-ref">state stores</span></a> to maintain processing states to, for example, remember recently
arrived records for stateful processing needs like aggregations and joins. For more information, see the <aclass="reference internal"href="#streams-developer-guide-state-store"><spanclass="std std-ref">state stores</span></a> documentation.</p>
<spanid="streams-developer-guide-state-store"></span><h2><aclass="toc-backref"href="#id3">State Stores</a><aclass="headerlink"href="#state-stores"title="Permalink to this headline"></a></h2>
<p>To implement a <strong>stateful</strong><codeclass="docutils literal"><spanclass="pre">Processor</span></code> or <codeclass="docutils literal"><spanclass="pre">Transformer</span></code>, you must provide one or more state stores to the processor
or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember
recently received input records, to track rolling aggregates, to de-duplicate input records, and more.
Another feature of state stores is that they can be
<aclass="reference internal"href="interactive-queries.html#streams-developer-guide-interactive-queries"><spanclass="std std-ref">interactively queried</span></a> from other applications, such as a
NodeJS-based dashboard or a microservice implemented in Scala or Go.</p>
<p>The
<aclass="reference internal"href="#streams-developer-guide-state-store-defining"><spanclass="std std-ref">available state store types</span></a> in Kafka Streams have
<aclass="reference internal"href="#streams-developer-guide-state-store-fault-tolerance"><spanclass="std std-ref">fault tolerance</span></a> enabled by default.</p>
<spanid="streams-developer-guide-state-store-defining"></span><h3><aclass="toc-backref"href="#id4">Defining and creating a State Store</a><aclass="headerlink"href="#defining-and-creating-a-state-store"title="Permalink to this headline"></a></h3>
<p>You can either use one of the available store types or
<aclass="reference internal"href="#streams-developer-guide-state-store-custom"><spanclass="std std-ref">implement your own custom store type</span></a>.
It’s common practice to leverage an existing store type via the <codeclass="docutils literal"><spanclass="pre">Stores</span></code> factory.</p>
<p>Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code.
Rather, you define state stores indirectly by creating a so-called <codeclass="docutils literal"><spanclass="pre">StoreBuilder</span></code>. This buildeer is used by
Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where
needed.</p>
<p>The following store types are available out of the box.</p>
<spanid="streams-developer-guide-state-store-fault-tolerance"></span><h3><aclass="toc-backref"href="#id5">Fault-tolerant State Stores</a><aclass="headerlink"href="#fault-tolerant-state-stores"title="Permalink to this headline"></a></h3>
<p>To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be
continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one
machine to another when <aclass="reference internal"href="running-app.html#streams-developer-guide-execution-scaling"><spanclass="std std-ref">elastically adding or removing capacity from your application</span></a>.
This topic is sometimes referred to as the state store’s associated <em>changelog topic</em>, or its <em>changelog</em>. For example, if
you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can
<aclass="reference internal"href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><spanclass="std std-ref">enable or disable this backup feature</span></a> for a
state store.</p>
<p>By default, persistent key-value stores are fault-tolerant. They are backed by a
<aclass="reference external"href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. The purpose of compacting this
topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster,
and to minimize recovery time if a state store needs to be restored from its changelog topic.</p>
<p>Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and
deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of
deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are
composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not
be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion
enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The
default retention setting is <codeclass="docutils literal"><spanclass="pre">Windows#maintainMs()</span></code> + 1 day. You can override this setting by specifying
<codeclass="docutils literal"><spanclass="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code> in the <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code>.</p>
<p>When you open an <codeclass="docutils literal"><spanclass="pre">Iterator</span></code> from a state store you must call <codeclass="docutils literal"><spanclass="pre">close()</span></code> on the iterator when you are done working with
it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator,
<spanid="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><aclass="toc-backref"href="#id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a><aclass="headerlink"href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs"title="Permalink to this headline"></a></h3>
<p>You can enable or disable fault tolerance for a state store by enabling or disabling the change logging
of the store through <codeclass="docutils literal"><spanclass="pre">enableLogging()</span></code> and <codeclass="docutils literal"><spanclass="pre">disableLogging()</span></code>.
You can also fine-tune the associated topic’s configuration if needed.</p>
<spanclass="o">.</span><spanclass="na">withLoggingDisabled</span><spanclass="o">();</span><spanclass="c1">// disable backing up the store to a changelog topic</span>
</pre></div>
</div>
<divclass="admonition attention">
<pclass="first admonition-title">Attention</p>
<pclass="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any <aclass="reference internal"href="config-streams.html#streams-developer-guide-standby-replicas"><spanclass="std std-ref">standby replicas</span></a>.</p>
</div>
<p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration:
You can add any log config from <aclass="reference external"href="https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L61">kafka.log.LogConfig</a>.
<spanid="streams-developer-guide-state-store-custom"></span><h3><aclass="toc-backref"href="#id7">Implementing Custom State Stores</a><aclass="headerlink"href="#implementing-custom-state-stores"title="Permalink to this headline"></a></h3>
<p>You can use the <aclass="reference internal"href="#streams-developer-guide-state-store-defining"><spanclass="std std-ref">built-in state store types</span></a> or implement your own.
The primary interface to implement for the store is
<codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such
as <codeclass="docutils literal"><spanclass="pre">KeyValueStore</span></code>.</p>
<p>Note that your customized <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.processor.StateStore</span></code> implementation also needs to provide the logic on how to restore the state
via the <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.processor.StateRestoreCallback</span></code> or <codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.processor.BatchingStateRestoreCallback</span></code> interface.
Details on how to instantiate these interfaces can be found in the <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html">javadocs</a>.</p>
<p>You also need to provide a “builder” for the store by implementing the
<codeclass="docutils literal"><spanclass="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of
<h2><aclass="toc-backref"href="#id10">Accessing Processor Context</a><aclass="headerlink"href="#accessing-processor-context"title="Permalink to this headline"></a></h2>
<p>As we have mentioned in the <ahref=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p>
and <codeclass="docutils literal"><spanclass="pre">stateDir</span></code>, and also record related metadata as <codeclass="docutils literal"><spanclass="pre">topic</span></code>,
<codeclass="docutils literal"><spanclass="pre">partition</span></code>, <codeclass="docutils literal"><spanclass="pre">offset</span></code>, <codeclass="docutils literal"><spanclass="pre">timestamp</span></code> and
<h2><aclass="toc-backref"href="#id8">Connecting Processors and State Stores</a><aclass="headerlink"href="#connecting-processors-and-state-stores"title="Permalink to this headline"></a></h2>
<p>Now that a <aclass="reference internal"href="#streams-developer-guide-stream-processor"><spanclass="std std-ref">processor</span></a> (WordCountProcessor) and the
state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by
using the <codeclass="docutils literal"><spanclass="pre">Topology</span></code> instance. In addition, you can add source processors with the specified Kafka topics
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
<p>Here is a quick explanation of this example:</p>
<ulclass="simple">
<li>A source processor node named <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> is added to the topology using the <codeclass="docutils literal"><spanclass="pre">addSource</span></code> method, with one Kafka topic
<codeclass="docutils literal"><spanclass="pre">"source-topic"</span></code> fed to it.</li>
<li>A processor node named <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> with the pre-defined <codeclass="docutils literal"><spanclass="pre">WordCountProcessor</span></code> logic is then added as the downstream
processor of the <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> node using the <codeclass="docutils literal"><spanclass="pre">addProcessor</span></code> method.</li>
<li>A predefined persistent key-value state store is created and associated with the <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> node, using
<li>A sink processor node is then added to complete the topology using the <codeclass="docutils literal"><spanclass="pre">addSink</span></code> method, taking the <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> node
as its upstream processor and writing to a separate <codeclass="docutils literal"><spanclass="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <codeclass="docutils literal"><spanclass="pre">addSink</span></code>
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
<p>In this topology, the <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> node, and an
upstream processor of the <codeclass="docutils literal"><spanclass="pre">"Sink"</span></code> node. As a result, whenever the <codeclass="docutils literal"><spanclass="pre">"Source"</span></code> node forwards a newly fetched record from
Kafka to its downstream <codeclass="docutils literal"><spanclass="pre">"Process"</span></code> node, the <codeclass="docutils literal"><spanclass="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and
update the associated state store. Whenever <codeclass="docutils literal"><spanclass="pre">context#forward()</span></code> is called in the
<codeclass="docutils literal"><spanclass="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <codeclass="docutils literal"><spanclass="pre">"Sink"</span></code> processor node to
the Kafka topic <codeclass="docutils literal"><spanclass="pre">"sink-topic"</span></code>. Note that in the <codeclass="docutils literal"><spanclass="pre">WordCountProcessor</span></code> implementation, you must refer to the
same store name <codeclass="docutils literal"><spanclass="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime,
indicating that the state store cannot be found. If the state store is not associated with the processor
in the <codeclass="docutils literal"><spanclass="pre">Topology</span></code> code, accessing it in the processor’s <codeclass="docutils literal"><spanclass="pre">init()</span></code> method will also throw an exception at
runtime, indicating the state store is not accessible from this processor.</p>
<p>Now that you have fully defined your processor topology in your application, you can proceed to
<aclass="reference internal"href="running-app.html#streams-developer-guide-execution"><spanclass="std std-ref">running the Kafka Streams application</span></a>.</p>