<spanid="streams-developer-guide-memory-management"></span><h1>Memory Management<aclass="headerlink"href="#memory-management"title="Permalink to this headline"></a></h1>
<p>You can specify the total memory (RAM) size used for internal caching and compacting of records. This caching happens
before the records are written to state stores or forwarded downstream to other nodes.</p>
<p>The record caches are implemented slightly different in the DSL and Processor API.</p>
<divclass="contents local topic"id="table-of-contents">
<spanid="streams-developer-guide-memory-management-record-cache"></span><h2><aclass="toc-backref"href="#id1">Record caches in the DSL</a><aclass="headerlink"href="#record-caches-in-the-dsl"title="Permalink to this headline"></a></h2>
<p>You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is leveraged
by the following <codeclass="docutils literal"><spanclass="pre">KTable</span></code> instances:</p>
<ulclass="simple">
<li>Source <codeclass="docutils literal"><spanclass="pre">KTable</span></code>: <codeclass="docutils literal"><spanclass="pre">KTable</span></code> instances that are created via <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#table()</span></code> or <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#globalTable()</span></code>.</li>
<li>Aggregation <codeclass="docutils literal"><spanclass="pre">KTable</span></code>: instances of <codeclass="docutils literal"><spanclass="pre">KTable</span></code> that are created as a result of <aclass="reference internal"href="dsl-api.html#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregations</span></a>.</li>
</ul>
<p>For such <codeclass="docutils literal"><spanclass="pre">KTable</span></code> instances, the record cache is used for:</p>
<ulclass="simple">
<li>Internal caching and compacting of output records before they are written by the underlying stateful
<aclass="reference internal"href="../concepts.html#streams-concepts-processor"><spanclass="std std-ref">processor node</span></a> to its internal state stores.</li>
<li>Internal caching and compacting of output records before they are forwarded from the underlying stateful
<aclass="reference internal"href="../concepts.html#streams-concepts-processor"><spanclass="std std-ref">processor node</span></a> to any of its downstream processor nodes.</li>
</ul>
<p>Use the following example to understand the behaviors with and without record caching. In this example, the input is a
<codeclass="docutils literal"><spanclass="pre">KStream<String,</span><spanclass="pre">Integer></span></code> with the records <codeclass="docutils literal"><spanclass="pre"><K,V>:</span><spanclass="pre"><A,</span><spanclass="pre">1>,</span><spanclass="pre"><D,</span><spanclass="pre">5>,</span><spanclass="pre"><A,</span><spanclass="pre">20>,</span><spanclass="pre"><A,</span><spanclass="pre">300></span></code>. The focus in this example is
on the records with key == <codeclass="docutils literal"><spanclass="pre">A</span></code>.</p>
<ul>
<li><pclass="first">An <aclass="reference internal"href="dsl-api.html#streams-developer-guide-dsl-aggregating"><spanclass="std std-ref">aggregation</span></a> computes the sum of record values, grouped by key, for
the input and returns a <codeclass="docutils literal"><spanclass="pre">KTable<String,</span><spanclass="pre">Integer></span></code>.</p>
<blockquote>
<div><ulclass="simple">
<li><strong>Without caching</strong>: a sequence of output records is emitted for key <codeclass="docutils literal"><spanclass="pre">A</span></code> that represent changes in the
resulting aggregation table. The parentheses (<codeclass="docutils literal"><spanclass="pre">()</span></code>) denote changes, the left number is the new aggregate value
and the right number is the old aggregate value: <codeclass="docutils literal"><spanclass="pre"><A,</span><spanclass="pre">(1,</span><spanclass="pre">null)>,</span><spanclass="pre"><A,</span><spanclass="pre">(21,</span><spanclass="pre">1)>,</span><spanclass="pre"><A,</span><spanclass="pre">(321,</span><spanclass="pre">21)></span></code>.</li>
<li><strong>With caching</strong>: a single output record is emitted for key <codeclass="docutils literal"><spanclass="pre">A</span></code> that would likely be compacted in the cache,
leading to a single output record of <codeclass="docutils literal"><spanclass="pre"><A,</span><spanclass="pre">(321,</span><spanclass="pre">null)></span></code>. This record is written to the aggregation’s internal state
store and forwarded to any downstream operations.</li>
</ul>
</div></blockquote>
</li>
</ul>
<p>The cache size is specified through the <codeclass="docutils literal"><spanclass="pre">cache.max.bytes.buffering</span></code> parameter, which is a global setting per
processing topology:</p>
<divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Enable record cache of size 10 MB.</span>
<p>This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with
<codeclass="docutils literal"><spanclass="pre">T</span></code> threads and <codeclass="docutils literal"><spanclass="pre">C</span></code> bytes allocated for caching, each thread will have an even <codeclass="docutils literal"><spanclass="pre">C/T</span></code> bytes to construct its own
cache and use as it sees fit among its tasks. This means that there are as many caches as there are threads, but no sharing of
caches across threads happens.</p>
<p>The basic API for the cache is made of <codeclass="docutils literal"><spanclass="pre">put()</span></code> and <codeclass="docutils literal"><spanclass="pre">get()</span></code> calls. Records are
evicted using a simple LRU scheme after the cache size is reached. The first time a keyed record <codeclass="docutils literal"><spanclass="pre">R1</span><spanclass="pre">=</span><spanclass="pre"><K1,</span><spanclass="pre">V1></span></code>
finishes processing at a node, it is marked as dirty in the cache. Any other keyed record <codeclass="docutils literal"><spanclass="pre">R2</span><spanclass="pre">=</span><spanclass="pre"><K1,</span><spanclass="pre">V2></span></code> with the
same key <codeclass="docutils literal"><spanclass="pre">K1</span></code> that is processed on that node during that time will overwrite <codeclass="docutils literal"><spanclass="pre"><K1,</span><spanclass="pre">V1></span></code>, this is referred to as
“being compacted”. This has the same effect as
<aclass="reference external"href="https://kafka.apache.org/documentation.html#compaction">Kafka’s log compaction</a>, but happens earlier, while the
records are still in memory, and within your client-side application, rather than on the server-side (i.e. the Kafka
broker). After flushing, <codeclass="docutils literal"><spanclass="pre">R2</span></code> is forwarded to the next processing node and then written to the local state store.</p>
<p>The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node
whenever the earliest of <codeclass="docutils literal"><spanclass="pre">commit.interval.ms</span></code> or <codeclass="docutils literal"><spanclass="pre">cache.max.bytes.buffering</span></code> (cache pressure) hits. Both
<codeclass="docutils literal"><spanclass="pre">commit.interval.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">cache.max.bytes.buffering</span></code> are global parameters. As such, it is not possible to specify
different parameters for individual nodes.</p>
<p>Here are example settings for both parameters based on desired scenarios.</p>
<ul>
<li><pclass="first">To turn off caching the cache size can be set to zero:</p>
<blockquote>
<div><divclass="highlight-java"><divclass="highlight"><pre><span></span><spanclass="c1">// Disable record cache</span>
<p>Turning off caching might result in high write traffic for the underlying RocksDB store.
With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled.
Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off.</p>
<p>For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB. For more information, see
the <aclass="reference internal"href="config-streams.html#streams-developer-guide-rocksdb-config"><spanclass="std std-ref">RocksDB config</span></a>.</p>
</div></blockquote>
</li>
<li><pclass="first">To enable caching but still have an upper bound on how long records will be cached, you can set the commit interval. In this example, it is set to 1000 milliseconds:</p>
<p>The effect of these two configurations is described in the figure below. The records are shown using 4 keys: blue, red, yellow, and green. Assume the cache has space for only 3 keys.</p>
<ul>
<li><pclass="first">When the cache is disabled (a), all of the input records will be output.</p>
</li>
<li><pclass="first">When the cache is enabled (b):</p>
<blockquote>
<div><ulclass="simple">
<li>Most records are output at the end of commit intervals (e.g., at <codeclass="docutils literal"><spanclass="pre">t1</span></code> a single blue record is output, which is the final over-write of the blue key up to that time).</li>
<li>Some records are output because of cache pressure (i.e. before the end of a commit interval). For example, see the red record before <codeclass="docutils literal"><spanclass="pre">t2</span></code>. With smaller cache sizes we expect cache pressure to be the primary factor that dictates when records are output. With large cache sizes, the commit interval will be the primary factor.</li>
<li>The total number of records output has been reduced from 15 to 8.</li>
<spanid="streams-developer-guide-memory-management-state-store-cache"></span><h2><aclass="toc-backref"href="#id2">Record caches in the Processor API</a><aclass="headerlink"href="#record-caches-in-the-processor-api"title="Permalink to this headline"></a></h2>
<p>You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is used
for internal caching and compacting of output records before they are written from a stateful processor node to its
state stores.</p>
<p>The record cache in the Processor API does not cache or compact any output records that are being forwarded downstream.
This means that all downstream processor nodes can see all records, whereas the state stores see a reduced number of records.
This does not impact correctness of the system, but is a performance optimization for the state stores. For example, with the
Processor API you can store a record in a state store while forwarding a different value downstream.</p>
<p>Following from the example first shown in section <aclass="reference internal"href="processor-api.html#streams-developer-guide-state-store"><spanclass="std std-ref">State Stores</span></a>, to enable caching, you can
add the <codeclass="docutils literal"><spanclass="pre">withCachingEnabled</span></code> call (note that caches are disabled by default and there is no explicit <codeclass="docutils literal"><spanclass="pre">withDisableCaching</span></code>
call).</p>
<p><strong>Tip:</strong> Caches are disabled by default and there is no explicit <codeclass="docutils literal"><spanclass="pre">disableCaching</span></code> call).</p>
<h2><aclass="toc-backref"href="#id3">Other memory usage</a><aclass="headerlink"href="#other-memory-usage"title="Permalink to this headline"></a></h2>
<p>There are other modules inside Apache Kafka that allocate memory during runtime. They include the following:</p>
<ulclass="simple">
<li>Producer buffering, managed by the producer config <codeclass="docutils literal"><spanclass="pre">buffer.memory</span></code>.</li>
<li>Consumer buffering, currently not strictly managed, but can be indirectly controlled by fetch size, i.e.,
<codeclass="docutils literal"><spanclass="pre">fetch.max.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">fetch.max.wait.ms</span></code>.</li>
<li>Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory.
These are controlled by the <codeclass="docutils literal"><spanclass="pre">send.buffer.bytes</span></code> / <codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> configs.</li>
<li>Deserialized objects buffering: after <codeclass="docutils literal"><spanclass="pre">consumer.poll()</span></code> returns records, they will be deserialized to extract
timestamp and buffered in the streams space. Currently this is only indirectly controlled by
<li>RocksDB’s own memory usage, both on-heap and off-heap; critical configs (for RocksDB version 4.1.0) include
<codeclass="docutils literal"><spanclass="pre">block_cache_size</span></code>, <codeclass="docutils literal"><spanclass="pre">write_buffer_size</span></code> and <codeclass="docutils literal"><spanclass="pre">max_write_buffer_number</span></code>. These can be specified through the
<p><strong>Iterators should be closed explicitly to release resources:</strong> Store iterators (e.g., <codeclass="docutils literal"><spanclass="pre">KeyValueIterator</span></code> and <codeclass="docutils literal"><spanclass="pre">WindowStoreIterator</span></code>) must be closed explicitly upon completeness to release resources such as open file handlers and in-memory read buffers, or use try-with-resources statement (available since JDK7) for this Closeable class.</p>
<pclass="last">Otherwise, stream application’s memory usage keeps increasing when running until it hits an OOM.</p>