In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka Streams.
It is highly recommended to read the <ahref="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
</p>
<h4><aid="tutorial_maven_setup"href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
<p>
We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
</p>
<preclass="brush: bash;">
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion={{fullDotVersion}} \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
</pre>
<p>
You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
Assuming the above parameter values are used, this command will create a project structure that looks like this:
The <code>pom.xml</code> file included in the project already has the Streams dependency defined,
and there are already several example programs written with Streams library under <code>src/main/java</code>.
Since we are going to start writing such programs from scratch, we can now delete these examples:
</p>
<preclass="brush: bash;">
> cd streams-quickstart
> rm src/main/java/myapps/*.java
</pre>
<h4><aid="tutorial_code_pipe"href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java</code>.
Let's name it <code>Pipe.java</code>:
<preclass="brush: java;">
package myapps;
public class Pipe {
public static void main(String[] args) throws Exception {
}
}
</pre>
<p>
We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
However if you are using a text editor you need to manually add the imports, and at the end of this section we'll show the complete code snippet with import statement for you.
</p>
<p>
The first step to write a Streams application is to create a <code>java.util.Properties</code> map to specify different Streams execution configuration values as defined in <code>StreamsConfig</code>.
A couple of important configuration values you need to set are: <code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster,
and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
</pre>
<p>
In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
As shown above, it illustrates that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
<code>KSTREAM-SOURCE-0000000000</code> continuously read records from Kafka topic <code>streams-plaintext-input</code> and pipe them to its downstream node <code>KSTREAM-SINK-0000000001</code>;
<code>KSTREAM-SINK-0000000001</code> will write each of its received record in order to another Kafka topic <code>streams-pipe-output</code>
(the <code>--></code> and <code><--</code> arrows dictates the downstream and upstream processor nodes of this node, i.e. "children" and "parents" within the topology graph).
It also illustrates that this simple topology has no global state stores associated with it (we will talk about state stores more in the following sections).
</p>
<p>
Note that we can always describe the topology as we did above at any given point while we are building it in the code, so as a user you can interactively "try and taste" your computational logic defined in the topology until you are happy with it.
Suppose we are already done with this simple topology that just pipes data from one Kafka topic to another in an endless streaming manner,
we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology object
(one can also construct a <code>StreamsConfig</code> object from the <code>props</code> map and then pass that object to the constructor,
<code>KafkaStreams</code> have overloaded constructor functions to takes either type).
</p>
<preclass="brush: java;">
final KafkaStreams streams = new KafkaStreams(topology, props);
</pre>
<p>
By calling its <code>start()</code> function we can trigger the execution of this client.
The execution won't stop until <code>close()</code> is called on this client.
We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
</p>
<preclass="brush: java;">
final CountDownLatch latch = new CountDownLatch(1);
As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
It takes the source node as its parent and the sink node as its child.
In other words, each record fetched by the source node will first traverse to the newly added <code>KSTREAM-FLATMAPVALUES-0000000001</code> node to be processed,
and one or more new records will be generated as a result. They will continue traverse down to the sink node to be written back to Kafka.
Note this processor node is "stateless" as it is not associated with any stores (i.e. <code>(stores: [])</code>).
</p>
<p>
The complete code looks like this (assuming lambda expression is used):
<h4><aid="tutorial_code_wordcount"href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
<p>
Let's now take a step further to add some "stateful" computations to the topology by counting the occurrence of the words split from the source text stream.
Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
</p>
<preclass="brush: java;">
public class WordCount {
public static void main(String[] args) throws Exception {
In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
Note that the <code>count</code> operator has a <code>String</code> typed parameter <code>Counts</code>,
which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic.
This <code>Counts</code> store can be queried in real-time, with details described in the <ahref="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
</p>
<p>
We can also write the <code>counts</code> KTable's changelog stream back into another Kafka topic, say <code>streams-wordcount-output</code>.
Note that this time the value type is no longer <code>String</code> but <code>Long</code>, so the default serialization classes are not viable for writing it to Kafka anymore.
We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
As we can see above, the topology now contains two disconnected sub-topologies.
The first sub-topology's sink node <code>KSTREAM-SINK-0000000004</code> will write to a repartition topic <code>Counts-repartition</code>,
which will be read by the second sub-topology's source node <code>KSTREAM-SOURCE-0000000006</code>.
The repartition topic is used to "shuffle" the source stream by its aggregation key, which is in this case the value string.
In addition, inside the first sub-topology a stateless <code>KSTREAM-FILTER-0000000005</code> node is injected between the grouping <code>KSTREAM-KEY-SELECT-0000000002</code> node and the sink node to filter out any intermediate record whose aggregate key is empty.
</p>
<p>
In the second sub-topology, the aggregation node <code>KSTREAM-AGGREGATE-0000000003</code> is associated with a state store named <code>Counts</code> (the name is specified by the user in the <code>count</code> operator).
Upon receiving each record from its upcoming stream source node, the aggregation processor will first query its associated <code>Counts</code> store to get the current count for that key, augment by one, and then write the new count back to the store.
Each updated count for the key will also be piped downstream to the <code>KTABLE-TOSTREAM-0000000007</code> node, which interpret this update stream as a record stream before further piping to the sink node <code>KSTREAM-SINK-0000000008</code> for writing back to Kafka.
</p>
<p>
The complete code looks like this (assuming lambda expression is used):