You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
11 KiB
242 lines
11 KiB
<!-- |
|
Licensed to the Apache Software Foundation (ASF) under one or more |
|
contributor license agreements. See the NOTICE file distributed with |
|
this work for additional information regarding copyright ownership. |
|
The ASF licenses this file to You under the Apache License, Version 2.0 |
|
(the "License"); you may not use this file except in compliance with |
|
the License. You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License. |
|
--> |
|
|
|
<script><!--#include virtual="../js/templateData.js" --></script> |
|
|
|
<script id="streams-template" type="text/x-handlebars-template"> |
|
<h1>Kafka Streams API</h1> |
|
|
|
<h3 style="max-width: 75rem;">The easiest way to write mission-critical real-time applications and microservices with all the benefits of Kafka's server-side cluster technology.</h3> |
|
|
|
<div class="hero"> |
|
<div class="hero__diagram"> |
|
<img src="/{{version}}/images/streams-welcome.png" /> |
|
</div> |
|
<div class="hero__cta"> |
|
<a style="display: none;" href="/{{version}}/documentation/streams/tutorial" class="btn">Write your first app</a> |
|
<a href="/{{version}}/documentation/streams/quickstart" class="btn">Play with demo app</a> |
|
</div> |
|
</div> |
|
|
|
<ul class="feature-list"> |
|
<li>Write standard Java applications</li> |
|
<li>Exactly-once processing semantics</li> |
|
<li>No seperate processing cluster required</li> |
|
<li>Develop on Mac, Linux, Windows</li> |
|
<li>Elastic, highly scalable, fault-tolerant</li> |
|
<li>Deploy to containers, VMs, bare metal, cloud</li> |
|
<li>Equally viable for small, medium, & large use cases</li> |
|
<li>Fully integrated with Kafka security</li> |
|
</ul> |
|
|
|
<div class="cards"> |
|
<a class="card" href="/{{version}}/documentation/streams/developer-guide"> |
|
<img class="card__icon" src="/{{version}}/images/icons/documentation.png" /> |
|
<img class="card__icon card__icon--hover" src="/{{version}}/images/icons/documentation--white.png" /> |
|
<span class="card__label">Developer manual</span> |
|
</a> |
|
<a style="display: none;" class="card" href="/{{version}}/documentation/streams/tutorial"> |
|
<img class="card__icon" src="/{{version}}/images/icons/tutorials.png" /> |
|
<img class="card__icon card__icon--hover" src="/{{version}}/images/icons/tutorials--white.png" /> |
|
<span class="card__label">Tutorials</span> |
|
</a> |
|
<a class="card" href="/{{version}}/documentation/streams/core-concepts"> |
|
<img class="card__icon" src="/{{version}}/images/icons/architecture.png" /> |
|
<img class="card__icon card__icon--hover" src="/{{version}}/images/icons/architecture--white.png" /> |
|
<span class="card__label">Concepts</span> |
|
</a> |
|
</div> |
|
|
|
<h3>Hello Kafka Streams</h3> |
|
<p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p> |
|
|
|
<div class="code-example"> |
|
<div class="btn-group"> |
|
<a class="selected b-java-8" data-section="java-8">Java 8+</a> |
|
<a class="b-java-7" data-section="java-7">Java 7</a> |
|
<a class="b-scala" data-section="scala">Scala</a> |
|
</div> |
|
|
|
<div class="code-example__snippet b-java-8 selected"> |
|
<pre class="brush: java;"> |
|
import org.apache.kafka.common.serialization.Serdes; |
|
import org.apache.kafka.streams.KafkaStreams; |
|
import org.apache.kafka.streams.StreamsBuilder; |
|
import org.apache.kafka.streams.StreamsConfig; |
|
import org.apache.kafka.streams.kstream.KStream; |
|
import org.apache.kafka.streams.kstream.KTable; |
|
|
|
import java.util.Arrays; |
|
import java.util.Properties; |
|
|
|
public class WordCountApplication { |
|
|
|
public static void main(final String[] args) throws Exception { |
|
Properties config = new Properties(); |
|
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); |
|
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); |
|
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
|
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
|
|
|
KStreamBuilder builder = new KStreamBuilder(); |
|
KStream<String, String> textLines = builder.stream("TextLinesTopic"); |
|
KTable<String, Long> wordCounts = textLines |
|
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) |
|
.groupBy((key, word) -> word) |
|
.count("Counts"); |
|
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); |
|
|
|
KafkaStreams streams = new KafkaStreams(builder.build(), config); |
|
streams.start(); |
|
} |
|
|
|
} |
|
</pre> |
|
</div> |
|
|
|
<div class="code-example__snippet b-java-7"> |
|
<pre class="brush: java;"> |
|
import org.apache.kafka.common.serialization.Serdes; |
|
import org.apache.kafka.streams.KafkaStreams; |
|
import org.apache.kafka.streams.StreamsBuilder; |
|
import org.apache.kafka.streams.StreamsConfig; |
|
import org.apache.kafka.streams.kstream.KStream; |
|
import org.apache.kafka.streams.kstream.KTable; |
|
import org.apache.kafka.streams.kstream.KeyValueMapper; |
|
import org.apache.kafka.streams.kstream.ValueMapper; |
|
|
|
import java.util.Arrays; |
|
import java.util.Properties; |
|
|
|
public class WordCountApplication { |
|
|
|
public static void main(final String[] args) throws Exception { |
|
Properties config = new Properties(); |
|
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); |
|
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); |
|
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
|
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
|
|
|
KStreamBuilder builder = new KStreamBuilder(); |
|
KStream<String, String> textLines = builder.stream("TextLinesTopic"); |
|
KTable<String, Long> wordCounts = textLines |
|
.flatMapValues(new ValueMapper<String, Iterable<String>>() { |
|
@Override |
|
public Iterable<String> apply(String textLine) { |
|
return Arrays.asList(textLine.toLowerCase().split("\\W+")); |
|
} |
|
}) |
|
.groupBy(new KeyValueMapper<String, String, String>() { |
|
@Override |
|
public String apply(String key, String word) { |
|
return word; |
|
} |
|
}) |
|
.count("Counts"); |
|
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); |
|
|
|
KafkaStreams streams = new KafkaStreams(builder, config); |
|
streams.start(); |
|
} |
|
|
|
} |
|
</pre> |
|
</div> |
|
|
|
<div class="code-example__snippet b-scala"> |
|
<pre class="brush: scala;"> |
|
import java.lang.Long |
|
import java.util.Properties |
|
import java.util.concurrent.TimeUnit |
|
|
|
import org.apache.kafka.common.serialization._ |
|
import org.apache.kafka.streams._ |
|
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} |
|
|
|
import scala.collection.JavaConverters.asJavaIterableConverter |
|
|
|
object WordCountApplication { |
|
|
|
def main(args: Array[String]) { |
|
val config: Properties = { |
|
val p = new Properties() |
|
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application") |
|
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092") |
|
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) |
|
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) |
|
p |
|
} |
|
|
|
val builder: KStreamBuilder = new KStreamBuilder() |
|
val textLines: KStream[String, String] = builder.stream("TextLinesTopic") |
|
val wordCounts: KTable[String, Long] = textLines |
|
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) |
|
.groupBy((_, word) => word) |
|
.count("Counts") |
|
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic") |
|
|
|
val streams: KafkaStreams = new KafkaStreams(builder, config) |
|
streams.start() |
|
|
|
Runtime.getRuntime.addShutdownHook(new Thread(() => { |
|
streams.close(10, TimeUnit.SECONDS) |
|
})) |
|
} |
|
|
|
} |
|
</pre> |
|
</div> |
|
</div> |
|
|
|
|
|
|
|
<div class="pagination"> |
|
<a href="#" class="pagination__btn pagination__btn__prev pagination__btn--disabled">Previous</a> |
|
<a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a> |
|
</div> |
|
</script> |
|
|
|
<!--#include virtual="../../includes/_header.htm" --> |
|
<!--#include virtual="../../includes/_top.htm" --> |
|
<div class="content documentation documentation--current"> |
|
<!--#include virtual="../../includes/_nav.htm" --> |
|
<div class="right"> |
|
<!--#include virtual="../../includes/_docs_banner.htm" --> |
|
<ul class="breadcrumbs"> |
|
<li><a href="/documentation">Documentation</a></li> |
|
</ul> |
|
<div class="p-streams"></div> |
|
</div> |
|
</div> |
|
<!--#include virtual="../../includes/_footer.htm" --> |
|
|
|
<script> |
|
$(function() { |
|
// Show selected style on nav item |
|
$('.b-nav__streams').addClass('selected'); |
|
|
|
// Display docs subnav items |
|
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
|
|
|
// Show selected code example |
|
$('.btn-group a').click(function(){ |
|
var targetClass = '.b-' + $(this).data().section; |
|
$('.code-example__snippet, .btn-group a').removeClass('selected'); |
|
$(targetClass).addClass('selected'); |
|
}); |
|
}); |
|
</script>
|
|
|