Browse Source

MINOR: additional kip-182 doc updates

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3971 from dguy/kip-182-docs
pull/3971/merge
Damian Guy 7 years ago
parent
commit
cc84686a4a
  1. 152
      docs/streams/developer-guide.html

152
docs/streams/developer-guide.html

@ -1383,26 +1383,31 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1383,26 +1383,31 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.aggregate(
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
Serdes.Long(), /* serde for aggregate value */
"time-windowed-aggregated-stream-store" /* state store name */);
Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
.withValueSerde(Serdes.Long())); /* serde for aggregate value */
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.aggregate(
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
Serdes.Long(), /* serde for aggregate value */
"sessionized-aggregated-stream-store" /* state store name */);
Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
.withValueSerde(Serdes.Long())); /* serde for aggregate value */
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.aggregate(
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
@ -1415,12 +1420,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1415,12 +1420,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return aggValue + newValue;
}
},
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
Serdes.Long(), /* serde for aggregate value */
"time-windowed-aggregated-stream-store" /* state store name */);
Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
.withValueSerde(Serdes.Long()) /* serde for aggregate value */
);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.aggregate(
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
@ -1439,9 +1446,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1439,9 +1446,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return rightAggValue + leftAggValue;
}
},
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
Serdes.Long(), /* serde for aggregate value */
"sessionized-aggregated-stream-store" /* state store name */);
Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
.withValueSerde(Serdes.Long()) /* serde for aggregate value */
);
</pre>
<p>
@ -1478,12 +1485,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1478,12 +1485,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
KGroupedTable&lt;String, Long&gt; groupedTable = ...;
// Counting a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count(
"counted-stream-store" /* state store name */);
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count();
// Counting a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count(
"counted-table-store" /* state store name */);
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count();
</pre>
<p>
Detailed behavior for <code>KGroupedStream</code>:
@ -1518,14 +1523,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1518,14 +1523,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream.count(
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-counted-stream-store" /* state store name */);
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
.count();
// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream.count(
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-counted-stream-store" /* state store name */);
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
.count();
</pre>
<p>
Detailed behavior:
@ -1561,14 +1566,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1561,14 +1566,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// Reducing a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
"reduced-stream-store" /* state store name */);
(aggValue, newValue) -> aggValue + newValue /* adder */
);
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
(aggValue, oldValue) -> aggValue - oldValue, /* subtractor */
"reduced-table-store" /* state store name */);
(aggValue, oldValue) -> aggValue - oldValue /* subtractor */
);
// Java 7 examples
@ -1580,8 +1585,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1580,8 +1585,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
"reduced-stream-store" /* state store name */);
}
);
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
@ -1596,8 +1601,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1596,8 +1601,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
public Long apply(Long aggValue, Long oldValue) {
return aggValue - oldValue;
}
},
"reduced-table-store" /* state store name */);
}
);
</pre>
<p>
Detailed behavior for <code>KGroupedStream</code>:
@ -1659,41 +1664,39 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1659,41 +1664,39 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-reduced-stream-store" /* state store name */);
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
.reduce((aggValue, newValue) -> aggValue + newValue /* adder */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-reduced-stream-store" /* state store name */);
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream = groupedStream
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
.reduce((aggValue, newValue) -> aggValue + newValue); /* adder */
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-reduced-stream-store" /* state store name */);
});
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-reduced-stream-store" /* state store name */);
});
</pre>
<p>
@ -1723,16 +1726,22 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1723,16 +1726,22 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<pre class="brush: java;">
// Key: word, value: count
Properties streamsProperties == ...;
// specify the default serdes so we don't need to elsewhere.
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
StreamsConfig config = new StreamsConfig(streamsProperties);
KStream&lt;String, Integer&gt; wordCounts = ...;
KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));
.groupByKey();
KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
Serdes.Integer(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
(aggKey, newValue, aggValue) -> aggValue + newValue /* adder */
);
</pre>
<p>
@ -1836,8 +1845,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -1836,8 +1845,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
Serdes.Integer(), /* serde for aggregate value */
"aggregated-table-store" /* state store name */);
Materialized.&lt;String, Integer, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("aggregated-table-store")
.withKeySerde(Serdes.String() /* serde for aggregate key */)
.withValueSerde(Serdes.Long() /* serde for aggregate value */)
);
</pre>
<p>
<b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>:
@ -2253,7 +2264,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -2253,7 +2264,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
groupedByWord.windowedBy(TimeWindows.of(60000))
.count(Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("CountsWindowStore")
withKeySerde(Serdes.String()); // count() sets value serde to Serdes.Long() automatically
);
</pre>
<p>
@ -2396,14 +2410,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -2396,14 +2410,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
Topology topology = ...;
ProcessorSupplier processorSuppler = ...;
// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
// Create CustomStoreBuilder for store name the-custom-store
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store");
// Add the source topic
topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
topology.addStateStore(customStoreSupplier, "the-processor");
topology.addStateStore(customStoreBuilder, "the-processor");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
@ -2478,7 +2492,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -2478,7 +2492,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.
groupedByWord.count("word-count");
groupedByWord.count(Materialized.&ltString, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("word-count"));
// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
@ -2835,22 +2849,20 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r @@ -2835,22 +2849,20 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<p>
For changelog topics you can also override the default configs on a per store basis.
This can be done by using any method overload that has a <code>StateStoreSupplier</code> as a parameter:
This can be done by using any method overload that has a <code>Materialized</code> as a parameter:
</p>
<pre class="brush: java;">
// a map to add topic config
Map&lt;String, String&gt; topicConfig = new HashMap<>();
Map&lt;String, String&gt; topicConfig = new HashMap&lt;&gt;();
topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000");
StateStoreSupplier supplier = Stores.create("store")
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.enableLogging(topicConfig) // pass in the config overrides
.build();
final Materialized&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; materialized = Materialized.as("store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.withLoggingEnabled(topicConfig); // pass in the config overrides
groupedStream.count(supplier)
groupedStream.count(materialized)
</pre>
<h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>

Loading…
Cancel
Save