<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
< script > <!-- # include virtual = "../js/templateData.js" -- > < / script >
< script id = "streams-template" type = "text/x-handlebars-template" >
< h1 > Kafka Streams API< / h1 >
< h3 style = "max-width: 75rem;" > The easiest way to write mission-critical real-time applications and microservices with all the benefits of Kafka's server-side cluster technology.< / h3 >
< div class = "hero" >
< div class = "hero__diagram" >
< img src = "/{{version}}/images/streams-welcome.png" / >
< / div >
< div class = "hero__cta" >
< a style = "display: none;" href = "/{{version}}/documentation/streams/tutorial" class = "btn" > Write your first app< / a >
< a href = "/{{version}}/documentation/streams/quickstart" class = "btn" > Play with demo app< / a >
< / div >
< / div >
< ul class = "feature-list" >
< li > Write standard Java applications< / li >
< li > Exactly-once processing semantics< / li >
< li > No seperate processing cluster required< / li >
< li > Develop on Mac, Linux, Windows< / li >
< li > Elastic, highly scalable, fault-tolerant< / li >
< li > Deploy to containers, VMs, bare metal, cloud< / li >
< li > Equally viable for small, medium, & large use cases< / li >
< li > Fully integrated with Kafka security< / li >
< / ul >
< div class = "cards" >
< a class = "card" href = "/{{version}}/documentation/streams/developer-guide" >
< img class = "card__icon" src = "/{{version}}/images/icons/documentation.png" / >
< img class = "card__icon card__icon--hover" src = "/{{version}}/images/icons/documentation--white.png" / >
< span class = "card__label" > Developer manual< / span >
< / a >
< a style = "display: none;" class = "card" href = "/{{version}}/documentation/streams/tutorial" >
< img class = "card__icon" src = "/{{version}}/images/icons/tutorials.png" / >
< img class = "card__icon card__icon--hover" src = "/{{version}}/images/icons/tutorials--white.png" / >
< span class = "card__label" > Tutorials< / span >
< / a >
< a class = "card" href = "/{{version}}/documentation/streams/core-concepts" >
< img class = "card__icon" src = "/{{version}}/images/icons/architecture.png" / >
< img class = "card__icon card__icon--hover" src = "/{{version}}/images/icons/architecture--white.png" / >
< span class = "card__label" > Concepts< / span >
< / a >
< / div >
< h3 > Hello Kafka Streams< / h3 >
< p > The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale< / p >
< div class = "code-example" >
< div class = "btn-group" >
< a class = "selected b-java-8" data-section = "java-8" > Java 8+< / a >
< a class = "b-java-7" data-section = "java-7" > Java 7< / a >
< a class = "b-scala" data-section = "scala" > Scala< / a >
< / div >
< div class = "code-example__snippet b-java-8 selected" >
< pre class = "brush: java;" >
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream< String, String> textLines = builder.stream("TextLinesTopic");
KTable< String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
< / pre >
< / div >
< div class = "code-example__snippet b-java-7" >
< pre class = "brush: java;" >
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream< String, String> textLines = builder.stream("TextLinesTopic");
KTable< String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper< String, Iterable< String> > () {
@Override
public Iterable< String> apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\\W+"));
}
})
.groupBy(new KeyValueMapper< String, String, String> () {
@Override
public String apply(String key, String word) {
return word;
}
})
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
< / pre >
< / div >
< div class = "code-example__snippet b-scala" >
< pre class = "brush: scala;" >
import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import scala.collection.JavaConverters.asJavaIterableConverter
object WordCountApplication {
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("Counts")
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
val streams: KafkaStreams = new KafkaStreams(builder, config)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
}
< / pre >
< / div >
< / div >
< div class = "pagination" >
< a href = "#" class = "pagination__btn pagination__btn__prev pagination__btn--disabled" > Previous< / a >
< a href = "/{{version}}/documentation/streams/quickstart" class = "pagination__btn pagination__btn__next" > Next< / a >
< / div >
< / script >
<!-- #include virtual="../../includes/_header.htm" -->
<!-- #include virtual="../../includes/_top.htm" -->
< div class = "content documentation documentation--current" >
<!-- #include virtual="../../includes/_nav.htm" -->
< div class = "right" >
<!-- #include virtual="../../includes/_docs_banner.htm" -->
< ul class = "breadcrumbs" >
< li > < a href = "/documentation" > Documentation< / a > < / li >
< / ul >
< div class = "p-streams" > < / div >
< / div >
< / div >
<!-- #include virtual="../../includes/_footer.htm" -->
< script >
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
// Show selected code example
$('.btn-group a').click(function(){
var targetClass = '.b-' + $(this).data().section;
$('.code-example__snippet, .btn-group a').removeClass('selected');
$(targetClass).addClass('selected');
});
});
< / script >