From cc84686a4aa24e541f7ca5ee9dcb0dea0ddbd79a Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 2 Oct 2017 13:20:49 -0700 Subject: [PATCH] MINOR: additional kip-182 doc updates Author: Damian Guy Reviewers: Michael G. Noll , Bill Bejeck , Matthias J. Sax , Ismael Juma Closes #3971 from dguy/kip-182-docs --- docs/streams/developer-guide.html | 244 ++++++++++++++++-------------- 1 file changed, 128 insertions(+), 116 deletions(-) diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index a064a5de39c..842325b528e 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -1383,65 +1383,72 @@ Note that in the WordCountProcessor implementation, users need to r // Java 8+ examples, using lambda expressions // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.aggregate( - () -> 0L, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - Serdes.Long(), /* serde for aggregate value */ - "time-windowed-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .aggregate( + () -> 0L, /* initializer */ + (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ + Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long())); /* serde for aggregate value */ + // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.aggregate( - () -> 0L, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */ - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - Serdes.Long(), /* serde for aggregate value */ - "sessionized-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .aggregate( + () -> 0L, /* initializer */ + (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ + (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */ + Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long())); /* serde for aggregate value */ // Java 7 examples // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.aggregate( - new Initializer<Long>() { /* initializer */ - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<String, Long, Long>() { /* adder */ - @Override - public Long apply(String aggKey, Long newValue, Long aggValue) { - return aggValue + newValue; - } - }, - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - Serdes.Long(), /* serde for aggregate value */ - "time-windowed-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .aggregate( + new Initializer<Long>() { /* initializer */ + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<String, Long, Long>() { /* adder */ + @Override + public Long apply(String aggKey, Long newValue, Long aggValue) { + return aggValue + newValue; + } + }, + Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long()) /* serde for aggregate value */ + ); // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.aggregate( - new Initializer<Long>() { /* initializer */ - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<String, Long, Long>() { /* adder */ - @Override - public Long apply(String aggKey, Long newValue, Long aggValue) { - return aggValue + newValue; - } - }, - new Merger<String, Long>() { /* session merger */ - @Override - public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) { - return rightAggValue + leftAggValue; - } - }, - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - Serdes.Long(), /* serde for aggregate value */ - "sessionized-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .aggregate( + new Initializer<Long>() { /* initializer */ + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<String, Long, Long>() { /* adder */ + @Override + public Long apply(String aggKey, Long newValue, Long aggValue) { + return aggValue + newValue; + } + }, + new Merger<String, Long>() { /* session merger */ + @Override + public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) { + return rightAggValue + leftAggValue; + } + }, + Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long()) /* serde for aggregate value */ + );

@@ -1478,12 +1485,10 @@ Note that in the WordCountProcessor implementation, users need to r KGroupedTable<String, Long> groupedTable = ...; // Counting a KGroupedStream - KTable<String, Long> aggregatedStream = groupedStream.count( - "counted-stream-store" /* state store name */); + KTable<String, Long> aggregatedStream = groupedStream.count(); // Counting a KGroupedTable - KTable<String, Long> aggregatedTable = groupedTable.count( - "counted-table-store" /* state store name */); + KTable<String, Long> aggregatedTable = groupedTable.count();

Detailed behavior for KGroupedStream: @@ -1518,14 +1523,14 @@ Note that in the WordCountProcessor implementation, users need to r KGroupedStream<String, Long> groupedStream = ...; // Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> aggregatedStream = groupedStream.count( - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - "time-windowed-counted-stream-store" /* state store name */); + KTable<Windowed<String>, Long> aggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .count(); // Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps) - KTable<Windowed<String>, Long> aggregatedStream = groupedStream.count( - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - "sessionized-counted-stream-store" /* state store name */); + KTable<Windowed<String>, Long> aggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .count();

Detailed behavior: @@ -1561,14 +1566,14 @@ Note that in the WordCountProcessor implementation, users need to r // Reducing a KGroupedStream KTable<String, Long> aggregatedStream = groupedStream.reduce( - (aggValue, newValue) -> aggValue + newValue, /* adder */ - "reduced-stream-store" /* state store name */); + (aggValue, newValue) -> aggValue + newValue /* adder */ + ); // Reducing a KGroupedTable KTable<String, Long> aggregatedTable = groupedTable.reduce( (aggValue, newValue) -> aggValue + newValue, /* adder */ - (aggValue, oldValue) -> aggValue - oldValue, /* subtractor */ - "reduced-table-store" /* state store name */); + (aggValue, oldValue) -> aggValue - oldValue /* subtractor */ + ); // Java 7 examples @@ -1580,8 +1585,8 @@ Note that in the WordCountProcessor implementation, users need to r public Long apply(Long aggValue, Long newValue) { return aggValue + newValue; } - }, - "reduced-stream-store" /* state store name */); + } + ); // Reducing a KGroupedTable KTable<String, Long> aggregatedTable = groupedTable.reduce( @@ -1596,8 +1601,8 @@ Note that in the WordCountProcessor implementation, users need to r public Long apply(Long aggValue, Long oldValue) { return aggValue - oldValue; } - }, - "reduced-table-store" /* state store name */); + } + );

Detailed behavior for KGroupedStream: @@ -1659,41 +1664,39 @@ Note that in the WordCountProcessor implementation, users need to r // Java 8+ examples, using lambda expressions // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce( - (aggValue, newValue) -> aggValue + newValue, /* adder */ - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - "time-windowed-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .reduce((aggValue, newValue) -> aggValue + newValue /* adder */); // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.reduce( - (aggValue, newValue) -> aggValue + newValue, /* adder */ - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - "sessionized-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .reduce((aggValue, newValue) -> aggValue + newValue); /* adder */ // Java 7 examples // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce( - new Reducer<Long>() { /* adder */ - @Override - public Long apply(Long aggValue, Long newValue) { - return aggValue + newValue; - } - }, - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - "time-windowed-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .reduce( + new Reducer<Long>() { /* adder */ + @Override + public Long apply(Long aggValue, Long newValue) { + return aggValue + newValue; + } + }); // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce( - new Reducer<Long>() { /* adder */ - @Override - public Long apply(Long aggValue, Long newValue) { - return aggValue + newValue; - } - }, - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - "sessionized-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .reduce( + new Reducer<Long>() { /* adder */ + @Override + public Long apply(Long aggValue, Long newValue) { + return aggValue + newValue; + } + });

@@ -1723,16 +1726,22 @@ Note that in the WordCountProcessor implementation, users need to r

         // Key: word, value: count
+        Properties streamsProperties == ...;
+
+        // specify the default serdes so we don't need to elsewhere.
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        StreamsConfig config = new StreamsConfig(streamsProperties);
+
         KStream<String, Integer> wordCounts = ...;
 
         KGroupedStream<String, Integer> groupedStream = wordCounts
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));
+            .groupByKey();
 
         KTable<String, Integer> aggregated = groupedStream.aggregate(
             () -> 0, /* initializer */
-            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
-            Serdes.Integer(), /* serde for aggregate value */
-            "aggregated-stream-store" /* state store name */);
+            (aggKey, newValue, aggValue) -> aggValue + newValue /* adder */
+        );
     

@@ -1836,8 +1845,10 @@ Note that in the WordCountProcessor implementation, users need to r () -> 0, /* initializer */ (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */ - Serdes.Integer(), /* serde for aggregate value */ - "aggregated-table-store" /* state store name */); + Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-table-store") + .withKeySerde(Serdes.String() /* serde for aggregate key */) + .withValueSerde(Serdes.Long() /* serde for aggregate value */) + );

Impact of record caches: @@ -2253,7 +2264,10 @@ Note that in the WordCountProcessor implementation, users need to r .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde)); // Create a window state store named "CountsWindowStore" that contains the word counts for every minute - groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore"); + groupedByWord.windowedBy(TimeWindows.of(60000)) + .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("CountsWindowStore") + withKeySerde(Serdes.String()); // count() sets value serde to Serdes.Long() automatically + );

@@ -2396,14 +2410,14 @@ Note that in the WordCountProcessor implementation, users need to r Topology topology = ...; ProcessorSupplier processorSuppler = ...; - // Create CustomStoreSupplier for store name the-custom-store - MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store"); + // Create CustomStoreBuilder for store name the-custom-store + MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store"); // Add the source topic topology.addSource("input", "inputTopic"); // Add a custom processor that reads from the source topic topology.addProcessor("the-processor", processorSupplier, "input"); // Connect your custom state store to the custom processor above - topology.addStateStore(customStoreSupplier, "the-processor"); + topology.addStateStore(customStoreBuilder, "the-processor"); KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); @@ -2478,7 +2492,7 @@ Note that in the WordCountProcessor implementation, users need to r // This call to `count()` creates a state store named "word-count". // The state store is discoverable and can be queried interactively. - groupedByWord.count("word-count"); + groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count")); // Start an instance of the topology KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); @@ -2835,22 +2849,20 @@ Note that in the WordCountProcessor implementation, users need to r

For changelog topics you can also override the default configs on a per store basis. - This can be done by using any method overload that has a StateStoreSupplier as a parameter: + This can be done by using any method overload that has a Materialized as a parameter:

         // a map to add topic config
-        Map<String, String> topicConfig = new HashMap<>();
+        Map<String, String> topicConfig = new HashMap<>();
         topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000");
 
-        StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent()
-                .enableLogging(topicConfig) // pass in the config overrides
-                .build();
-        
-        groupedStream.count(supplier)
+        final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("store")
+            .withKeySerde(Serdes.String())
+            .withValueSerde(Serdes.String())
+            .withLoggingEnabled(topicConfig); // pass in the config overrides
+
+        groupedStream.count(materialized)
     

Executing Your Kafka Streams Application