Mirror of Apache Kafka
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

374 lines
23 KiB

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script>
<!--#include virtual="../js/templateData.js" -->
</script>
<script id="streams-template" type="text/x-handlebars-template">
<h1>Kafka Streams</h1>
<div class="sub-nav-sticky">
<div class="sticky-top">
<div style="height:35px">
<a class="active-menu-item" href="/{{version}}/documentation/streams/">Introduction</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/architecture">Architecture</a>
<a href="/{{version}}/documentation/streams/developer-guide/">Developer Guide</a>
<a href="/{{version}}/documentation/streams/upgrade-guide">Upgrade</a>
</div>
</div>
</div>
<h3 class="streams_intro">The easiest way to write mission-critical real-time applications and microservices</h3>
<p class="streams__description">Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.</p>
<div class="video__series__grid">
<div class="yt__video__block">
<div class="yt__video__inner__block">
<iframe class="yt_series video_1 active" style="display:block" src="https://www.youtube.com/embed/Z3JKCLG3VP4?rel=0&showinfo=0&end=602" frameborder="0" allowfullscreen></iframe>
<iframe class="yt_series video_2" src="https://www.youtube.com/embed/LxxeXI1mPKo?rel=0&showinfo=0&end=622" frameborder="0" allowfullscreen></iframe>
<iframe class="yt_series video_3" src="https://www.youtube.com/embed/7JYEEx7SBuE?rel=0&showinfo=0end=557" frameborder="0" allowfullscreen></iframe>
<iframe class="yt_series video_4" src="https://www.youtube.com/embed/3kJgYIkAeHs?rel=0&showinfo=0&end=564" frameborder="0" allowfullscreen></iframe>
</div>
</div>
<div class="video__block">
<h3>TOUR OF THE STREAMS API</h3>
<div class="video__list">
<p class="video__item video_list_1 active" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_1').show();">
<span class="number">1</span><span class="video__text">Intro to Streams</span>
</p>
<p class="video__item video_list_2" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_2').show();">
<span class="number">2</span><span class="video__text">Creating a Streams Application</span>
</p>
<p class="video__item video_list_3" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_3').show();">
<span class="number">3</span><span class="video__text">Transforming Data Pt. 1</span>
</p>
<p class="video__item video_list_4" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_4').show();">
<span class="number">4</span><span class="video__text">Transforming Data Pt. 11</span>
</p>
</div>
</div>
</div>
<hr class="separator">
<div class="use-item-section">
<div class="use__list__sec">
<h3>Why you'll love using Kafka Streams!</h3>
<ul class="use-feature-list">
<li>Elastic, highly scalable, fault-tolerant</li>
<li>Deploy to containers, VMs, bare metal, cloud</li>
<li>Equally viable for small, medium, &amp; large use cases</li>
<li>Fully integrated with Kafka security</li>
<li>Write standard Java applications</li>
<li>Exactly-once processing semantics</li>
<li>No separate processing cluster required</li>
<li>Develop on Mac, Linux, Windows</li>
</ul>
</div>
<div class="first__app__cta">
<a href="/{{version}}/documentation/streams/tutorial" class="first__app__btn">Write your first app</a>
</div>
</div>
<hr class="separator" id="streams-use-cases">
<h3 class="stream__text">Kafka Streams use cases</h3>
<div class="customers__grid">
<div class="customer__grid">
<div class="customer__item streams_logo_grid streams__ny__grid">
<a href="https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077" target="_blank" class="grid__logo__link">
<span class="grid__item__logo" style="background-image: url('/images/powered-by/NYT.jpg');"></span>
</a>
<p class="grid__item__customer__description extra__space">
<a href="https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077" target="_blank">The New York Times uses Apache Kafka </a>and the Kafka Streams to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.
</p>
</div>
</div>
<div class="customer__grid">
<div class="customer__item streams_logo_grid streams__zalando__grid">
<a href="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/" target="_blank" class="grid__logo__link">
<span class="grid__item__logo" style="background-image: url('/images/powered-by/zalando.jpg');"></span>
</a>
<p class="grid__item__customer__description extra__space">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing
<a href="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/" target='blank'> event streams</a> enables our technical team to do near-real time business intelligence.
</p>
</div>
</div>
<div class="customer__grid">
<div class="customer__item streams_logo_grid streams__line__grid">
<a href="https://engineering.linecorp.com/en/blog/detail/80" target="_blank" class="grid__logo__link">
<span class="grid__item__logo" style="background-image: url('/images/powered-by/line.svg');width:9rem"></span>
</a>
<p class="grid__item__customer__description extra__space"><a href="https://engineering.linecorp.com/en/blog/detail/80" target="_blank">LINE uses Apache Kafka</a> as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics consumers can efficiently consume, meanwhile retaining easy maintainability thanks to its sophisticated yet minimal code base.</p>
</div>
</div>
<div class="customer__grid">
<div class="customer__item streams_logo_grid streams__ny__grid">
<a href="https://medium.com/@Pinterest_Engineering/using-kafka-streams-api-for-predictive-budgeting-9f58d206c996" target="_blank" class="grid__logo__link">
<span class="grid__item__logo" style="background-image: url('/images/powered-by/pinterest.png');"></span>
</a>
<p class="grid__item__customer__description">
<a href="https://medium.com/@Pinterest_Engineering/using-kafka-streams-api-for-predictive-budgeting-9f58d206c996" target="_blank">Pinterest uses Apache Kafka and the Kafka Streams</a> at large scale to power the real-time, predictive budgeting system of their advertising infrastructure. With Kafka Streams, spend predictions are more accurate than ever.
</p>
</div>
</div>
<div class="customer__grid">
<div class="customer__item streams_logo_grid streams__rabobank__grid">
<a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/" target="_blank" class="grid__logo__link">
<span class="grid__item__logo" style="background-image: url('/images/powered-by/rabobank.jpg');"></span>
</a>
<p class="grid__item__customer__description">Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one of which is Rabo Alerts. This service alerts customers in real-time upon financial events and is <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/" target="_blank">built using Kafka Streams.</a></p>
</div>
</div>
<div class="customer__grid">
<div class="customer__item streams_logo_grid streams__ny__grid">
<a href="https://speakerdeck.com/xenji/kafka-and-debezium-at-trivago-code-dot-talks-2017-edition" target="_blank" class="grid__logo__link">
<span class="grid__item__logo" style="background-image: url('/images/powered-by/trivago.png');"></span>
</a>
<p class="grid__item__customer__description">
Trivago is a global hotel search platform. We are focused on reshaping the way travelers search for and compare hotels, while enabling hotel advertisers to grow their businesses by providing access to a broad audience of travelers via our websites and apps. As of 2017, we offer access to approximately 1.8 million hotels and other accommodations in over 190 countries. We use Kafka, Kafka Connect, and Kafka Streams to <a href="https://speakerdeck.com/xenji/kafka-and-debezium-at-trivago-code-dot-talks-2017-edition" target="_blank">enable our developers</a> to access data freely in the company. Kafka Streams powers parts of our analytics pipeline and delivers endless options to explore and operate on the data sources we have at hand.
</p>
</div>
</div>
</div>
<h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
<p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p>
<div class="code-example">
<div class="btn-group">
<a class="selected b-java-8" data-section="java-8">Java 8+</a>
<a class="b-java-7" data-section="java-7">Java 7</a>
<a class="b-scala" data-section="scala">Scala</a>
</div>
<div class="code-example__snippet b-java-8 selected">
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
KTable&lt;String, Long&gt; wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
</pre>
</div>
<div class="code-example__snippet b-java-7">
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
KTable&lt;String, Long&gt; wordCounts = textLines
.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\\W+"));
}
})
.groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
</pre>
</div>
<div class="code-example__snippet b-scala">
<pre class="brush: scala;">
import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
import org.apache.kafka.streams.state.KeyValueStore
import scala.collection.JavaConverters.asJavaIterableConverter
object WordCountApplication {
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(), Serdes.Long()))
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
}
</pre>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../includes/_header.htm" -->
<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation documentation--current">
<!--#include virtual="../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a>
<li><a href="/documentation/streams">Kafka Streams</a></li>
</li>
</ul>
<div class="p-streams"></div>
</div>
</div>
<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
$('.video_list_1').click(function(){
$('.video_2').attr('src', $('.video_2').attr('src'));
$('.video_3').attr('src', $('.video_3').attr('src'));
$('.video_4').attr('src', $('.video_4').attr('src'));
});
$('.video_list_2').click(function(){
$('.video_1').attr('src', $('.video_1').attr('src'));
$('.video_3').attr('src', $('.video_3').attr('src'));
$('.video_4').attr('src', $('.video_4').attr('src'));
});
$('.video_list_3').click(function(){
$('.video_1').attr('src', $('.video_1').attr('src'));
$('.video_2').attr('src', $('.video_2').attr('src'));
$('.video_4').attr('src', $('.video_4').attr('src'));
});
$('.video_list_4').click(function(){
$('.video_1').attr('src', $('.video_1').attr('src'));
$('.video_2').attr('src', $('.video_2').attr('src'));
$('.video_3').attr('src', $('.video_3').attr('src'));
});
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
// Show selected code example
$('.btn-group a').click(function(){
var targetClass = '.b-' + $(this).data().section;
$('.code-example__snippet, .btn-group a').removeClass('selected');
$(targetClass).addClass('selected');
});
});
</script>