You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
625 lines
60 KiB
625 lines
60 KiB
<!-- |
|
Licensed to the Apache Software Foundation (ASF) under one or more |
|
contributor license agreements. See the NOTICE file distributed with |
|
this work for additional information regarding copyright ownership. |
|
The ASF licenses this file to You under the Apache License, Version 2.0 |
|
(the "License"); you may not use this file except in compliance with |
|
the License. You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License. |
|
--> |
|
|
|
<script><!--#include virtual="../../js/templateData.js" --></script> |
|
|
|
<script id="content-template" type="text/x-handlebars-template"> |
|
<!-- h1>Developer Guide for Kafka Streams</h1 --> |
|
<div class="sub-nav-sticky"> |
|
<div class="sticky-top"> |
|
<!-- div style="height:35px"> |
|
<a href="/{{version}}/documentation/streams/">Introduction</a> |
|
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> |
|
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
|
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
|
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
|
</div --> |
|
</div> |
|
</div> |
|
|
|
<div class="section" id="processor-api"> |
|
<span id="streams-developer-guide-processor-api"></span><h1>Processor API<a class="headerlink" href="#processor-api" title="Permalink to this headline"></a></h1> |
|
<p>The Processor API allows developers to define and connect custom processors and to interact with state stores. With the |
|
Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these |
|
processors with their associated state stores to compose the processor topology that represents a customized processing |
|
logic.</p> |
|
<div class="contents local topic" id="table-of-contents"> |
|
<p class="topic-title first"><b>Table of Contents</b></p> |
|
<ul class="simple"> |
|
<li><a class="reference internal" href="#overview" id="id1">Overview</a></li> |
|
<li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream |
|
Processor</a></li> |
|
<li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li> |
|
<li><a class="reference internal" href="#state-stores" id="id3">State Stores</a> |
|
<ul> |
|
<li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li> |
|
<li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li> |
|
<li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li> |
|
<li><a class="reference internal" href="#timestamped-state-stores" id="id11">Timestamped State Stores</a></li> |
|
<li><a class="reference internal" href="#versioned-state-stores" id="id12">Versioned Key-Value State Stores</a></li> |
|
<li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li> |
|
</ul> |
|
</li> |
|
<li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li> |
|
<li><a class="reference internal" href="#accessing-processor-context" id="id10">Accessing Processor Context</a></li> |
|
</ul> |
|
</div> |
|
<div class="section" id="overview"> |
|
<h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2> |
|
<p>The Processor API can be used to implement both <strong>stateless</strong> as well as <strong>stateful</strong> operations, where the latter is |
|
achieved through the use of <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a>.</p> |
|
<div class="admonition tip"> |
|
<p><b>Tip</b></p> |
|
<p class="last"><strong>Combining the DSL and the Processor API:</strong> |
|
You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the |
|
section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p> |
|
</div> |
|
<p>For a complete list of available API functionality, see the <a href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p> |
|
</div> |
|
<div class="section" id="defining-a-stream-processor"> |
|
<span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2> |
|
<p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step. |
|
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect |
|
these processors with their associated state stores to compose the processor topology.</p> |
|
<p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method. |
|
The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p> |
|
<p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction |
|
phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code> |
|
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, |
|
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation |
|
function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), |
|
and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>). |
|
Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the |
|
<code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single |
|
<code class="docutils literal"><span class="pre">Processor</span></code> object by calling |
|
<code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> |
|
<p> |
|
The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters: |
|
<code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types |
|
that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and |
|
<code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed |
|
to <code class="docutils literal"><span class="pre">process()</span></code>. |
|
Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code> |
|
define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> |
|
will accept. If your processor does not forward any records at all (or if it only forwards |
|
<code class="docutils literal"><span class="pre">null</span></code> keys or values), |
|
a best practice is to set the output generic type argument to |
|
<code class="docutils literal"><span class="pre">Void</span></code>. |
|
If it needs to forward multiple types that don't share a common superclass, you will |
|
have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>. |
|
</p> |
|
<p> |
|
Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code> |
|
and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> |
|
methods handle records in the form of the <code class="docutils literal"><span class="pre">Record<K, V></span></code> |
|
data class. This class gives you access to the main components of a Kafka record: |
|
the key, value, timestamp and headers. When forwarding records, you can use the |
|
constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code> |
|
from scratch, or you can use the convenience builder methods to replace one of the |
|
<code class="docutils literal"><span class="pre">Record</span></code>'s properties |
|
and copy over the rest. For example, |
|
<code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code> |
|
would copy the key, timestamp, and headers from |
|
<code class="docutils literal"><span class="pre">inputRecord</span></code> while |
|
setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>. |
|
Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>, |
|
but instead creates a shallow copy. Beware that this is only a shallow copy, so if you |
|
plan to mutate the key, value, or headers elsewhere in the program, you will want to |
|
create a deep copy of those fields yourself. |
|
</p> |
|
<p> |
|
In addition to handling incoming records via |
|
<code class="docutils literal"><span class="pre">Processor#process()</span></code>, |
|
you have the option to schedule periodic invocation (called "punctuation") |
|
in your processor's <code class="docutils literal"><span class="pre">init()</span></code> |
|
method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> |
|
and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>. |
|
The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used |
|
for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time |
|
is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely |
|
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there |
|
is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p> |
|
<p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you |
|
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), |
|
then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code> |
|
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p> |
|
<p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time. |
|
Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these |
|
60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records |
|
were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code> |
|
callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple |
|
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> |
|
<div class="admonition attention"> |
|
<p class="first admonition-title"><b>Attention</b></p> |
|
<p class="last">Stream-time is only advanced when Streams processes records. |
|
If there are no records to process, or if Streams is waiting for new records |
|
due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a> |
|
configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. |
|
This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p> |
|
</div> |
|
<p><b>Example</b></p> |
|
<p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p> |
|
<ul class="simple"> |
|
<li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li> |
|
<li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> |
|
<li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li> |
|
</ul> |
|
<pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor<String, String, String, String> { |
|
private KeyValueStore<String, Integer> kvStore; |
|
|
|
@Override |
|
public void init(final ProcessorContext<String, String> context) { |
|
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { |
|
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { |
|
while (iter.hasNext()) { |
|
final KeyValue<String, Integer> entry = iter.next(); |
|
context.forward(new Record<>(entry.key, entry.value.toString(), timestamp)); |
|
} |
|
} |
|
}); |
|
kvStore = context.getStateStore("Counts"); |
|
} |
|
|
|
@Override |
|
public void process(final Record<String, String> record) { |
|
final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+"); |
|
|
|
for (final String word : words) { |
|
final Integer oldValue = kvStore.get(word); |
|
|
|
if (oldValue == null) { |
|
kvStore.put(word, 1); |
|
} else { |
|
kvStore.put(word, oldValue + 1); |
|
} |
|
} |
|
} |
|
|
|
@Override |
|
public void close() { |
|
// close any resources managed by this processor |
|
// Note: Do not close any StateStores as these are managed by the library |
|
} |
|
}</code></pre> |
|
<div class="admonition note"> |
|
<p><b>Note</b></p> |
|
<p class="last"><strong>Stateful processing with state stores:</strong> |
|
The <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> defined above can access the currently received record in its <code class="docutils literal"><span class="pre">process()</span></code> method, and it can |
|
leverage <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> to maintain processing states to, for example, remember recently |
|
arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p> |
|
</div> |
|
</div> |
|
<div class="section" id="unit-testing-processors"> |
|
<h2> |
|
<a class="toc-backref" href="#id9">Unit Testing Processors</a> |
|
<a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a> |
|
</h2> |
|
<p> |
|
Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your |
|
processors <a href="testing.html#unit-testing-processors">here</a>. |
|
</p> |
|
</div> |
|
<div class="section" id="state-stores"> |
|
<span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2> |
|
<p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor |
|
or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember |
|
recently received input records, to track rolling aggregates, to de-duplicate input records, and more. |
|
Another feature of state stores is that they can be |
|
<a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a |
|
NodeJS-based dashboard or a microservice implemented in Scala or Go.</p> |
|
<p>The |
|
<a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">available state store types</span></a> in Kafka Streams have |
|
<a class="reference internal" href="#streams-developer-guide-state-store-fault-tolerance"><span class="std std-ref">fault tolerance</span></a> enabled by default.</p> |
|
<div class="section" id="defining-and-creating-a-state-store"> |
|
<span id="streams-developer-guide-state-store-defining"></span><h3><a class="toc-backref" href="#id4">Defining and creating a State Store</a><a class="headerlink" href="#defining-and-creating-a-state-store" title="Permalink to this headline"></a></h3> |
|
<p>You can either use one of the available store types or |
|
<a class="reference internal" href="#streams-developer-guide-state-store-custom"><span class="std std-ref">implement your own custom store type</span></a>. |
|
It’s common practice to leverage an existing store type via the <code class="docutils literal"><span class="pre">Stores</span></code> factory.</p> |
|
<p>Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code. |
|
Rather, you define state stores indirectly by creating a so-called <code class="docutils literal"><span class="pre">StoreBuilder</span></code>. This builder is used by |
|
Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where |
|
needed.</p> |
|
<p>The following store types are available out of the box.</p> |
|
<table border="1" class="non-scrolling-table width-100-percent docutils"> |
|
<colgroup> |
|
<col width="19%" /> |
|
<col width="11%" /> |
|
<col width="18%" /> |
|
<col width="51%" /> |
|
</colgroup> |
|
<thead valign="bottom"> |
|
<tr class="row-odd"><th class="head">Store Type</th> |
|
<th class="head">Storage Engine</th> |
|
<th class="head">Fault-tolerant?</th> |
|
<th class="head">Description</th> |
|
</tr> |
|
</thead> |
|
<tbody valign="top"> |
|
<tr class="row-even"><td>Persistent |
|
<code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
|
<td>RocksDB</td> |
|
<td>Yes (enabled by default)</td> |
|
<td><ul class="first simple"> |
|
<li><strong>The recommended store type for most use cases.</strong></li> |
|
<li>Stores its data on local disk.</li> |
|
<li>Storage capacity: |
|
managed local state can be larger than the memory (heap space) of an |
|
application instance, but must fit into the available local disk |
|
space.</li> |
|
<li>RocksDB settings can be fine-tuned, see |
|
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li> |
|
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)">store variants</a>: |
|
timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.</li> |
|
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)">persistentTimestampedKeyValueStore</a> |
|
when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.</li> |
|
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">persistentVersionedKeyValueStore</a> |
|
when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.</li> |
|
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)">persistentWindowStore</a> |
|
or <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)">persistentTimestampedWindowStore</a> |
|
when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.</li> |
|
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore(java.lang.String,java.time.Duration)">persistentSessionStore</a> |
|
when you need a persistent sessionWindowedKey-value store.</li> |
|
</ul> |
|
<pre class="line-numbers"><code class="language-java">// Creating a persistent key-value store: |
|
// here, we create a `KeyValueStore<String, Long>` named "persistent-counts". |
|
import org.apache.kafka.streams.state.StoreBuilder; |
|
import org.apache.kafka.streams.state.Stores; |
|
|
|
// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`. |
|
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = |
|
Stores.keyValueStoreBuilder( |
|
Stores.persistentKeyValueStore("persistent-counts"), |
|
Serdes.String(), |
|
Serdes.Long()); |
|
KeyValueStore<String, Long> countStore = countStoreSupplier.build();</code></pre> |
|
</td> |
|
</tr> |
|
<tr class="row-odd"><td>In-memory |
|
<code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
|
<td>-</td> |
|
<td>Yes (enabled by default)</td> |
|
<td><ul class="first simple"> |
|
<li>Stores its data in memory.</li> |
|
<li>Storage capacity: |
|
managed local state must fit into memory (heap space) of an |
|
application instance.</li> |
|
<li>Useful when application instances run in an environment where local |
|
disk space is either not available or local disk space is wiped |
|
in-between app instance restarts.</li> |
|
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-">store variants</a>: |
|
time window key-value store, session window key-value store.</li> |
|
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html">TimestampedKeyValueStore</a> |
|
when you need a key-(value/timestamp) store that supports put/get/delete and range queries.</li> |
|
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html">TimestampedWindowStore</a> |
|
when you need to store windowedKey-(value/timestamp) pairs.</li> |
|
<li>There is no built-in in-memory, versioned key-value store at this time.</li> |
|
</ul> |
|
<pre class="line-numbers"><code class="language-java">// Creating an in-memory key-value store: |
|
// here, we create a `KeyValueStore<String, Long>` named "inmemory-counts". |
|
import org.apache.kafka.streams.state.StoreBuilder; |
|
import org.apache.kafka.streams.state.Stores; |
|
|
|
// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`. |
|
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = |
|
Stores.keyValueStoreBuilder( |
|
Stores.inMemoryKeyValueStore("inmemory-counts"), |
|
Serdes.String(), |
|
Serdes.Long()); |
|
KeyValueStore<String, Long> countStore = countStoreSupplier.build();</code></pre> |
|
</td> |
|
</tr> |
|
</tbody> |
|
</table> |
|
</div> |
|
<div class="section" id="fault-tolerant-state-stores"> |
|
<span id="streams-developer-guide-state-store-fault-tolerance"></span><h3><a class="toc-backref" href="#id5">Fault-tolerant State Stores</a><a class="headerlink" href="#fault-tolerant-state-stores" title="Permalink to this headline"></a></h3> |
|
<p>To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be |
|
continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one |
|
machine to another when <a class="reference internal" href="running-app.html#streams-developer-guide-execution-scaling"><span class="std std-ref">elastically adding or removing capacity from your application</span></a>. |
|
This topic is sometimes referred to as the state store’s associated <em>changelog topic</em>, or its <em>changelog</em>. For example, if |
|
you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can |
|
<a class="reference internal" href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><span class="std std-ref">enable or disable this backup feature</span></a> for a |
|
state store.</p> |
|
<p>Fault-tolerant state stores are backed by a |
|
<a class="reference external" href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. The purpose of compacting this |
|
topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, |
|
and to minimize recovery time if a state store needs to be restored from its changelog topic.</p> |
|
<p>Fault-tolerant windowed state stores are backed by a topic that uses both compaction and |
|
deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of |
|
deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are |
|
composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not |
|
be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion |
|
enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The |
|
default retention setting is <code class="docutils literal"><span class="pre">Windows#maintainMs()</span></code> + 1 day. You can override this setting by specifying |
|
<code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code> in the <code class="docutils literal"><span class="pre">StreamsConfig</span></code>.</p> |
|
<p>When you open an <code class="docutils literal"><span class="pre">Iterator</span></code> from a state store you must call <code class="docutils literal"><span class="pre">close()</span></code> on the iterator when you are done working with |
|
it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, |
|
you may encounter an OOM error.</p> |
|
</div> |
|
<div class="section" id="enable-or-disable-fault-tolerance-of-state-stores-store-changelogs"> |
|
<span id="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><a class="toc-backref" href="#id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a><a class="headerlink" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" title="Permalink to this headline"></a></h3> |
|
<p>You can enable or disable fault tolerance for a state store by enabling or disabling the change logging |
|
of the store through <code class="docutils literal"><span class="pre">enableLogging()</span></code> and <code class="docutils literal"><span class="pre">disableLogging()</span></code>. |
|
You can also fine-tune the associated topic’s configuration if needed.</p> |
|
<p>Example for disabling fault-tolerance:</p> |
|
<pre class="line-numbers"><code class="language-java">import org.apache.kafka.streams.state.StoreBuilder; |
|
import org.apache.kafka.streams.state.Stores; |
|
|
|
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder( |
|
Stores.persistentKeyValueStore("Counts"), |
|
Serdes.String(), |
|
Serdes.Long()) |
|
.withLoggingDisabled(); // disable backing up the store to a changelog topic</code></pre> |
|
<div class="admonition attention"> |
|
<p class="first admonition-title">Attention</p> |
|
<p class="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any <a class="reference internal" href="config-streams.html#streams-developer-guide-standby-replicas"><span class="std std-ref">standby replicas</span></a>.</p> |
|
</div> |
|
<p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration: |
|
You can add any log config from <a class="reference external" href="https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogConfig.scala">kafka.log.LogConfig</a>. |
|
Unrecognized configs will be ignored.</p> |
|
<pre class="line-numbers"><code class="language-java">import org.apache.kafka.streams.state.StoreBuilder; |
|
import org.apache.kafka.streams.state.Stores; |
|
|
|
Map<String, String> changelogConfig = new HashMap(); |
|
// override min.insync.replicas |
|
changelogConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1") |
|
|
|
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder( |
|
Stores.persistentKeyValueStore("Counts"), |
|
Serdes.String(), |
|
Serdes.Long()) |
|
.withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings</code></pre> |
|
</div> |
|
<div class="section" id="timestamped-state-stores"> |
|
<span id="streams-developer-guide-state-store-timestamps"></span><h3><a class="toc-backref" href="#id11">Timestamped State Stores</a><a class="headerlink" href="#timestamped-state-stores" title="Permalink to this headline"></a></h3> |
|
<p>KTables always store timestamps by default. |
|
A timestamped state store improves stream processing semantics and enables |
|
handling out-of-order data in source KTables, detecting out-of-order joins and aggregations, |
|
and getting the timestamp of the latest update in an Interactive Query.</p> |
|
<p>You can query timestamped state stores both with and without a timestamp.</p> |
|
<b>Upgrade note:</b> All users upgrade with a single rolling bounce per instance. |
|
<ul class="first simple"> |
|
<li>For Processor API users, nothing changes in existing applications, and you |
|
have the option of using the timestamped stores.</li> |
|
<li>For DSL operators, store data is upgraded lazily in the background.</li> |
|
<li>No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in |
|
by implementing the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html">TimestampedBytesStore</a> |
|
interface. In this case, the old format is retained, and Streams uses a proxy store |
|
that removes/adds timestamps on read/write.</li> |
|
</ul> |
|
</p> |
|
</div> |
|
<div class="section" id="versioned-state-stores"> |
|
<span id="streams-developer-guide-state-store-versioned"></span><h3><a class="toc-backref" href="#id12">Versioned Key-Value State Stores</a><a class="headerlink" href="#versioned-state-stores" title="Permalink to this headline"></a></h3> |
|
<p>Versioned key-value state stores are available since Kafka Streams 3.5. |
|
Rather than storing a single record version (value and timestamp) per key, |
|
versioned state stores may store multiple record versions per key. This |
|
allows versioned state stores to support timestamped retrieval operations |
|
to return the latest record (per key) as of a specified timestamp.</p> |
|
<p>You can create a persistent, versioned state store by passing a |
|
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">VersionedBytesStoreSupplier</a> |
|
to the |
|
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#versionedKeyValueStoreBuilder(java.lang.String,java.time.Duration)">versionedKeyValueStoreBuilder</a>, |
|
or by implementing your own |
|
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html">VersionedKeyValueStore</a>.</p> |
|
<p>Each versioned store has an associated, fixed-duration <em>history retention</em> |
|
parameter which specifies long old record versions should be kept for. |
|
In particular, a versioned store guarantees to return accurate results for |
|
timestamped retrieval operations where the timestamp being queried is within |
|
history retention of the current observed stream time.</p> |
|
<p>History retention also doubles as its <em>grace period</em>, which determines |
|
how far back in time out-of-order writes to the store will be accepted. A |
|
versioned store will not accept writes (inserts, updates, or deletions) if |
|
the timestamp associated with the write is older than the current observed |
|
stream time by more than the grace period. Stream time in this context is |
|
tracked per-partition, rather than per-key, which means it's important |
|
that grace period (i.e., history retention) be set high enough to |
|
accommodate a record with one key arriving out-of-order relative to a |
|
record for another key.</p> |
|
<p>Because the memory footprint of versioned key-value stores is higher than |
|
that of non-versioned key-value stores, you may want to adjust your |
|
<a class="reference internal" href="memory-mgmt.html#streams-developer-guide-memory-management-rocksdb"><span class="std std-ref">RocksDB memory settings</span></a> |
|
accordingly. Benchmarking your application with versioned stores is also |
|
advised as performance is expected to be worse than when using non-versioned |
|
stores.</p> |
|
<p>Versioned stores do not support caching or interactive queries at this time. |
|
Also, window stores and global tables may not be versioned.</p> |
|
<b>Upgrade note:</b> Versioned state stores are opt-in only; no automatic |
|
upgrades from non-versioned to versioned stores will take place. |
|
<p>Upgrades are supported from persistent, non-versioned key-value stores |
|
to persistent, versioned key-value stores as long as the original store |
|
has the same changelog topic format as the versioned store being upgraded |
|
to. Both persistent |
|
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)">key-value stores</a> |
|
and <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)">timestamped key-value stores</a> |
|
share the same changelog topic format as |
|
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">persistent versioned key-value stores</a>, |
|
and therefore both are eligible for upgrades.</p> |
|
<p>If you wish to upgrade an application using persistent, non-versioned |
|
key-value stores to use persistent, versioned key-value stores |
|
instead, you can perform the following procedure:</p> |
|
<ul class="first simple"> |
|
<li>Stop all application instances, and |
|
<a class="reference internal" href="app-reset-tool.html#streams-developer-guide-reset-local-environment"><span class="std std-ref">clear any local state directories</span></a> |
|
for the store(s) being upgraded.</li> |
|
<li>Update your application code to use versioned stores where desired.</li> |
|
<li>Update your changelog topic configs, for the relevant state stores, |
|
to set the value of <code class="docutils literal"><span class="pre">min.compaction.lag.ms</span></code> |
|
to be at least your desired history retention. History retention plus |
|
one day is recommended as buffer for the use of broker wall clock time |
|
during compaction.</li> |
|
<li>Restart your application instances and allow time for the versioned |
|
stores to rebuild state from changelog.</li> |
|
</ul> |
|
</p> |
|
</div> |
|
<div class="section" id="implementing-custom-state-stores"> |
|
<span id="streams-developer-guide-state-store-custom"></span><h3><a class="toc-backref" href="#id7">Implementing Custom State Stores</a><a class="headerlink" href="#implementing-custom-state-stores" title="Permalink to this headline"></a></h3> |
|
<p>You can use the <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">built-in state store types</span></a> or implement your own. |
|
The primary interface to implement for the store is |
|
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such |
|
as <code class="docutils literal"><span class="pre">KeyValueStore</span></code> and <code class="docutils literal"><span class="pre">VersionedKeyValueStore</span></code>.</p> |
|
<p>Note that your customized <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code> implementation also needs to provide the logic on how to restore the state |
|
via the <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateRestoreCallback</span></code> or <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.BatchingStateRestoreCallback</span></code> interface. |
|
Details on how to instantiate these interfaces can be found in the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html">javadocs</a>.</p> |
|
<p>You also need to provide a “builder” for the store by implementing the |
|
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of |
|
your store.</p> |
|
</div> |
|
</div> |
|
<div class="section" id="accessing-processor-context"> |
|
<h2><a class="toc-backref" href="#id10">Accessing Processor Context</a><a class="headerlink" href="#accessing-processor-context" title="Permalink to this headline"></a></h2> |
|
<p>As we have mentioned in the <a href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p> |
|
<p>This object can also be used to access the metadata related with the application like |
|
<code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>, |
|
and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>, |
|
<code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and |
|
<code class="docutils literal"><span class="pre">headers</span></code>.</p> |
|
<p>Here is an example implementation of how to add a new header to the record:</p> |
|
<pre class="line-numbers"><code class="language-java">public void process(String key, String value) { |
|
|
|
// add a header to the elements |
|
context().headers().add.("key", "value"); |
|
}</code></pre> |
|
<div class="section" id="connecting-processors-and-state-stores"> |
|
<h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2> |
|
<p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the |
|
state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by |
|
using the <code class="docutils literal"><span class="pre">Topology</span></code> instance. In addition, you can add source processors with the specified Kafka topics |
|
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate |
|
output data streams out of the topology.</p> |
|
<p>Here is an example implementation:</p> |
|
<pre class="line-numbers"><code class="language-java">Topology builder = new Topology(); |
|
// add the source processor node that takes Kafka topic "source-topic" as input |
|
builder.addSource("Source", "source-topic") |
|
// add the WordCountProcessor node which takes the source processor as its upstream processor |
|
.addProcessor("Process", () -> new WordCountProcessor(), "Source") |
|
// add the count store associated with the WordCountProcessor processor |
|
.addStateStore(countStoreBuilder, "Process") |
|
// add the sink processor node that takes Kafka topic "sink-topic" as output |
|
// and the WordCountProcessor node as its upstream processor |
|
.addSink("Sink", "sink-topic", "Process");</code></pre> |
|
<p>Here is a quick explanation of this example:</p> |
|
<ul class="simple"> |
|
<li>A source processor node named <code class="docutils literal"><span class="pre">"Source"</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic |
|
<code class="docutils literal"><span class="pre">"source-topic"</span></code> fed to it.</li> |
|
<li>A processor node named <code class="docutils literal"><span class="pre">"Process"</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream |
|
processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li> |
|
<li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">"Process"</span></code> node, using |
|
<code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li> |
|
<li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">"Process"</span></code> node |
|
as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code> |
|
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li> |
|
</ul> |
|
<p>In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology. |
|
This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> |
|
instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this: |
|
</p> |
|
<pre class="line-numbers"><code class="language-java">Topology builder = new Topology(); |
|
// add the source processor node that takes Kafka "source-topic" as input |
|
builder.addSource("Source", "source-topic") |
|
// add the WordCountProcessor node which takes the source processor as its upstream processor. |
|
// the ProcessorSupplier provides the count store associated with the WordCountProcessor |
|
.addProcessor("Process", new ProcessorSupplier<String, String, String, String>() { |
|
public Processor<String, String, String, String> get() { |
|
return new WordCountProcessor(); |
|
} |
|
|
|
public Set<StoreBuilder<?>> stores() { |
|
final StoreBuilder<KeyValueStore<String, Long>> countsStoreBuilder = |
|
Stores |
|
.keyValueStoreBuilder( |
|
Stores.persistentKeyValueStore("Counts"), |
|
Serdes.String(), |
|
Serdes.Long() |
|
); |
|
return Collections.singleton(countsStoreBuilder); |
|
} |
|
}, "Source") |
|
// add the sink processor node that takes Kafka topic "sink-topic" as output |
|
// and the WordCountProcessor node as its upstream processor |
|
.addSink("Sink", "sink-topic", "Process");</code></pre> |
|
<p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology. |
|
Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same <code class="docutils literal"><span class="pre">instance</span></code>.</p> |
|
<p>In these topologies, the <code class="docutils literal"><span class="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node, and an |
|
upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from |
|
Kafka to its downstream <code class="docutils literal"><span class="pre">"Process"</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and |
|
update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the |
|
<code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to |
|
the Kafka topic <code class="docutils literal"><span class="pre">"sink-topic"</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the |
|
same store name <code class="docutils literal"><span class="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime, |
|
indicating that the state store cannot be found. If the state store is not associated with the processor |
|
in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor’s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at |
|
runtime, indicating the state store is not accessible from this processor.</p> |
|
<p>Note that the <code class="docutils literal"><span class="pre">Topology#addProcessor</span></code> function takes a <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> as argument, and that the supplier pattern requires that a new |
|
<code class="docutils literal"><span class="pre">Processor</span></code> instance is returned each time <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> is called. Creating a single <code class="docutils literal"><span class="pre">Processor</span></code> |
|
object and returning the same object reference in <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> would be a violation of the supplier pattern and leads to runtime exceptions. |
|
So remember not to provide a singleton <code class="docutils literal"><span class="pre">Processor</span></code> instance to <code class="docutils literal"><span class="pre">Topology</span></code>. The |
|
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> should always generate a new instance each time <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> gets called.</p> |
|
<p>Now that you have fully defined your processor topology in your application, you can proceed to |
|
<a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p> |
|
</div> |
|
</div> |
|
|
|
|
|
</div> |
|
</div> |
|
<div class="pagination"> |
|
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api" class="pagination__btn pagination__btn__prev">Previous</a> |
|
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__next">Next</a> |
|
</div> |
|
</script> |
|
|
|
<!--#include virtual="../../../includes/_header.htm" --> |
|
<!--#include virtual="../../../includes/_top.htm" --> |
|
<div class="content documentation "> |
|
<!--#include virtual="../../../includes/_nav.htm" --> |
|
<div class="right"> |
|
<!--//#include virtual="../../../includes/_docs_banner.htm" --> |
|
<ul class="breadcrumbs"> |
|
<li><a href="/documentation">Documentation</a></li> |
|
<li><a href="/documentation/streams">Kafka Streams</a></li> |
|
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li> |
|
</ul> |
|
<div class="p-content"></div> |
|
</div> |
|
</div> |
|
<!--#include virtual="../../../includes/_footer.htm" --> |
|
<script> |
|
$(function() { |
|
// Show selected style on nav item |
|
$('.b-nav__streams').addClass('selected'); |
|
|
|
//sticky secondary nav |
|
var $navbar = $(".sub-nav-sticky"), |
|
y_pos = $navbar.offset().top, |
|
height = $navbar.height(); |
|
|
|
$(window).scroll(function() { |
|
var scrollTop = $(window).scrollTop(); |
|
|
|
if (scrollTop > y_pos - height) { |
|
$navbar.addClass("navbar-fixed") |
|
} else if (scrollTop <= y_pos) { |
|
$navbar.removeClass("navbar-fixed") |
|
} |
|
}); |
|
|
|
// Display docs subnav items |
|
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
|
}); |
|
</script>
|
|
|