Streams
Overview
Kafka Streams is a client library for processing and analyzing data stored in Kafka and either write the resulting data back to Kafka or send the final output to an external system. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state. Kafka Streams has a low barrier to entry: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka's parallelism model.
Some highlights of Kafka Streams:
We first summarize the key concepts of Kafka Streams.
Kafka Streams offers two ways to define the stream processing topology: the Kafka Streams DSL provides
the most common data transformation operations such as map
, filter
, join
and aggregations
out of the box; the lower-level Processor API allows
developers define and connect custom processors as well as to interact with state stores.
A critical aspect in stream processing is the notion of time, and how it is modeled and integrated. For example, some operations such as windowing are defined based on time boundaries.
Common notions of time in streams are:
The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka's configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is. Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps.
Kafka Streams assigns a timestamp to every data record
via the TimestampExtractor
interface.
Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field
to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing,
thereby yielding processing-time semantics to stream processing applications.
Developers can thus enforce different notions of time depending on their business needs. For example,
per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and
are leveraged by time-dependent operations such as joins.
Finally, whenever a Kafka Streams application writes records to Kafka, then it will also assign timestamps to these new records. The way the timestamps are assigned depends on the context:
context.forward()
triggered in the process()
function call, output record timestamps are inherited from input record timestamps directly.punctuate()
, the output record timestamp is defined as the current internal time (obtained through context.timestamp()
) of the stream task.Some stream processing applications don't require state, which means the processing of a message is independent from the processing of all other messages. However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL.
Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. This is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.
Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called Interactive Queries. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation.
There is a quickstart example that provides how to run a stream processing program coded in the Kafka Streams library. This section focuses on how to write, configure, and execute a Kafka Streams application.
As we have mentioned above, the computational logic of a Kafka Streams application is defined as a processor topology. Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
Developers can define their customized processing logic by implementing the Processor
interface, which
provides process
and punctuate
methods. The process
method is performed on each
of the received record; and the punctuate
method is performed periodically based on elapsed time.
In addition, the processor can maintain the current ProcessorContext
instance variable initialized in the
init
method, and use the context to schedule the punctuation period (context().schedule
), to
forward the modified / new key-value pair to downstream processors (context().forward
), to commit the current
processing progress (context().commit
), etc.
public class MyProcessor extends Processor<String, String> { private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context; this.context.schedule(1000); this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); } @Override public void process(String dummy, String line) { String[] words = line.toLowerCase().split(" "); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } } @Override public void punctuate(long timestamp) { KeyValueIterator<String, Integer> iter = this.kvStore.all(); while (iter.hasNext()) { KeyValue<String, Integer> entry = iter.next(); context.forward(entry.key, entry.value.toString()); } iter.close(); context.commit(); } @Override public void close() { this.kvStore.close(); } };
In the above implementation, the following actions are performed:
init
method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".process
method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).punctuate
method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.
With the customized processors defined in the Processor API, developers can use the TopologyBuilder
to build a processor topology
by connecting these processors together:
TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE") .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") .addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3");There are several steps in the above code to build the topology, and here is a quick walk through:
addSource
method, with one Kafka topic "src-topic" fed to it.addProcessor
method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.addSink
method, each piping from a different parent processor node and writing to a separate topic.
Note that the Processor API is not limited to only accessing the current records as they arrive, but can also maintain local state stores
that keep recently arrived records to use in stateful processing operations such as aggregation or windowed joins.
To take advantage of this local states, developers can use the TopologyBuilder.addStateStore
method when building the
processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
local state store with the existing processor nodes through TopologyBuilder.connectProcessorAndStateStores
.
TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE") // create the in-memory state store "COUNTS" associated with processor "PROCESS1" .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1") .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") // connect the state store "COUNTS" with processor "PROCESS2" .connectProcessorAndStateStores("PROCESS2", "COUNTS"); .addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3");In the next section we present another way to build the processor topology: the Kafka Streams DSL.
KStreamBuilder
class, which is extended from the TopologyBuilder
.
A simple example is included with the source code for Kafka in the streams/examples
package. The rest of this section will walk
through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
codes for details.
Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams: the so-called stream-table duality. Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka’s log compaction feature, for example, exploits this duality.
A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
The stream-table duality describes the close relationship between streams and tables.Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).
Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. The stream-table duality is such an important concept that Kafka Streams models it explicitly via the KStream and KTable interfaces, which we describe in the next sections.
("alice", 1) --> ("alice", 3)If these records a KStream and the stream processing application were to sum the values it would return
4
. If these records were a KTable, the return would be 3
, since the last record would be considered as an update.
Either a record stream (defined as KStream
) or a changelog stream (defined as KTable
)
can be created as a source stream from one or more Kafka topics (for KTable
you can only create the source stream
from a single topic).
KStreamBuilder builder = new KStreamBuilder(); KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2"); KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
JoinWindows
class.In the Kafka Streams DSL users can specify a retention period for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval. If a record arrives after the retention period has passed, the record cannot be processed and is dropped.
Late-arriving records are always possible in real-time data streams. However, it depends on the effective time semantics how late records are handled. Using processing-time, the semantics are “when the data is being processed”, which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving “late”) for event-time or ingestion-time semantics. In both cases, Kafka Streams is able to properly handle late-arriving records.
ValueJoiner
. A new KStream
instance representing the result stream of the join is returned from this operator.KTable
instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.KTable
) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of user activities (KStream
) with the latest user profile information (KTable
). Only records received from the record stream will trigger the join and produce results via ValueJoiner
, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new KStream
instance representing the result stream of the join is returned from this operator.
In the Kafka Streams DSL, an input stream of an aggregation can be a KStream
or a KTable
, but the output stream will always be a KTable
.
This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted.
When such late arrival happens, the aggregating KStream
or KTable
simply emits a new aggregate value. Because the output is a KTable
, the new value is considered to overwrite the old value with the same key in subsequent processing steps.
Besides join and aggregation operations, there is a list of other transformation operations provided for KStream
and KTable
respectively.
Each of these operations may generate either one or more KStream
and KTable
objects and
can be translated into one or more connected processors into the underlying processor topology.
All these transformation methods can be chained together to compose a complex processor topology.
Since KStream
and KTable
are strongly typed, all these transformation operations are defined as
generics functions where users could specify the input and output data types.
Among these transformations, filter
, map
, mapValues
, etc, are stateless
transformation operations and can be applied to both KStream
and KTable
,
where users can usually pass a customized function to these functions as a parameter, such as Predicate
for filter
,
KeyValueMapper
for map
, etc:
// written in Java 8+, using lambda expressions KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
require accessing an associated state for processing and producing outputs.
For example, in join
and aggregate
operations, a windowing state is usually used to store all the received records
within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
based on them.
// written in Java 8+, using lambda expressions KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate( () -> 0L, // initial value (aggKey, value, aggregate) -> aggregate + 1L, // aggregating value TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds Serdes.Long() // serde for aggregated value ); KStream<String, String> joined = source1.leftJoin(source2, (record1, record2) -> record1.get("user") + "-" + record2.get("region"); );
At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
KStream.to
and KTable.to
.
joined.to("topic4");If your application needs to continue reading and processing the records after they have been materialized to a topic via
to
above, one option is to construct a new stream that reads from the output topic;
Kafka Streams provides a convenience method called through
:
// equivalent to // // joined.to("topic4"); // materialized = builder.stream("topic4"); KStream<String, String> materialized = joined.through("topic4");
Besides defining the topology, developers will also need to configure their applications
in StreamsConfig
before running it. A complete list of
Kafka Streams configs can be found here.
See Upgrade Section for details.
KafkaStreams
:
#setStateListener(StateListener listener)
#state()
#metrics()
#close(long timeout, TimeUnit timeUnit)
#toString(String indent)
StreamsConfig
:
zookeeper.connect
was deprecated StreamsConfig
StreamsMetrics
interface:
#addLatencySensor()
#addLatencyAndThroughputSensor()
, #addThroughputSensor()
, #recordThroughput()
,
#addSensor()
, #removeSensor()
TopologyBuilder
:
#addSource()
that allow to define a auto.offset.reset
policy per source node #addGlobalStore()
to add global StateStore
s KStreamBuilder
:
#stream()
and #table()
that allow to define a auto.offset.reset
policy per input stream/table #table()
always requires store name #globalKTable()
to create a GlobalKTable
KStream
:
#join()
to join with KTable
#join()
and leftJoin()
to join with GlobalKTable
null
-key handling for KTable
joins:
KTable-KTable
joins do not throw an exception on null
key records anymore, but drop those records silently SessionWindows
to specify session windows KGroupedStream
methods #count()
, #reduce()
, and #aggregate()
to allow session window aggregations TimestampExtractor
:
#extract()
has a second parameter now FailOnInvalidTimestamp
(it gives the same behavior as old (and removed) default extractor ConsumerRecordTimestampExtractor
) LogAndSkipOnInvalidTimestamp
and UsePreviousTimeOnInvalidTimestamps