From 6626b058c7893ce1192456b3fc7617f98da6f3cc Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Tue, 13 Dec 2016 09:44:11 -0800 Subject: [PATCH] MINOR: Fix Streams examples in documentation Performed minor cleanup and escaped `<` and `>` so code examples are shown correctly in the browser. Author: Vahid Hashemian Reviewers: Matthias J. Sax, Guozhang Wang Closes #2247 from vahidhashemian/doc/fix_streams_doc --- docs/streams.html | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/streams.html b/docs/streams.html index 74620ec8cc7..306b2a50a2c 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -279,8 +279,8 @@ from a single topic).
     KStreamBuilder builder = new KStreamBuilder();
 
-    KStream source1 = builder.stream("topic1", "topic2");
-    KTable source2 = builder.table("topic3", "stateStoreName");
+    KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
+    KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
 
Windowing a stream
@@ -301,7 +301,7 @@ A join operation merges two streams based on the keys of their data recor Depending on the operands the following join operations are supported: inner joins, outer joins and left joins. Their semantics are similar to the corresponding operators in relational databases. -a +
Transform a stream

@@ -323,12 +323,12 @@ where users can usually pass a customized function to these functions as a param

     // written in Java 8+, using lambda expressions
-    KStream mapped = source1.mapValue(record -> record.get("category"));
+    KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
 

Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise -they do not require a state store associated with the stream processor; Stateful transformations, on the other hand, +they do not require a state store associated with the stream processor; stateful transformations, on the other hand, require accessing an associated state for processing and producing outputs. For example, in join and aggregate operations, a windowing state is usually used to store all the received records within the defined window boundary so far. The operators can then access these accumulated records in the store and compute @@ -337,14 +337,14 @@ based on them.

     // written in Java 8+, using lambda expressions
-    KTable, Long> counts = source1.groupByKey().aggregate(
+    KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate(
         () -> 0L,  // initial value
         (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
         TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
         Serdes.Long() // serde for aggregated value
     );
 
-    KStream joined = source1.leftJoin(source2,
+    KStream<String, String> joined = source1.leftJoin(source2,
         (record1, record2) -> record1.get("user") + "-" + record2.get("region");
     );
 
@@ -369,7 +369,7 @@ Kafka Streams provides a convenience method called through: // // joined.to("topic4"); // materialized = builder.stream("topic4"); - KStream materialized = joined.through("topic4"); + KStream<String, String> materialized = joined.through("topic4");