Browse Source

KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream

Add overloads of `count`, `reduce`, and `aggregate` that are `Materialized` to `KGroupedStream`.
Refactor common parts between `KGroupedStream` and `WindowedKStream`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3827 from dguy/kafka-5654
pull/3886/merge
Damian Guy 7 years ago
parent
commit
d83252ebae
  1. 210
      streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
  2. 76
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
  3. 127
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
  4. 25
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  5. 13
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
  6. 57
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
  7. 106
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java

210
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@ -145,6 +146,38 @@ public interface KGroupedStream<K, V> { @@ -145,6 +146,38 @@ public interface KGroupedStream<K, V> {
*/
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Count the number of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = "count-store"; // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Count the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
@ -395,7 +428,7 @@ public interface KGroupedStream<K, V> { @@ -395,7 +428,7 @@ public interface KGroupedStream<K, V> {
* and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
@ -452,12 +485,14 @@ public interface KGroupedStream<K, V> { @@ -452,12 +485,14 @@ public interface KGroupedStream<K, V> {
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprectated use {@link #reduce(Reducer, Materialized)}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
final String queryableStoreName);
@ -507,14 +542,68 @@ public interface KGroupedStream<K, V> { @@ -507,14 +542,68 @@ public interface KGroupedStream<K, V> {
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprectated use {@link #reduce(Reducer, Materialized)}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate (first argument) and the record's value (second argument):
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
* @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
* Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
* max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
KTable<K, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Combine the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
@ -678,7 +767,7 @@ public interface KGroupedStream<K, V> { @@ -678,7 +767,7 @@ public interface KGroupedStream<K, V> {
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param windows the specification of the aggregation {@link Windows}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
@ -743,7 +832,7 @@ public interface KGroupedStream<K, V> { @@ -743,7 +832,7 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer}
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
@ -778,7 +867,7 @@ public interface KGroupedStream<K, V> { @@ -778,7 +867,7 @@ public interface KGroupedStream<K, V> {
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* @param reducer the instance of {@link Reducer}
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
@ -841,7 +930,7 @@ public interface KGroupedStream<K, V> { @@ -841,7 +930,7 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer}
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
@ -905,12 +994,113 @@ public interface KGroupedStream<K, V> { @@ -905,12 +994,113 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde,
final String queryableStoreName);
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
* provide an initial intermediate aggregation result that is used to process the first record.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
* count (c.f. {@link #count(String)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
* provide an initial intermediate aggregation result that is used to process the first record.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
* count (c.f. {@link #count(String)}).
* <p>
* The default value serde from config will be used for serializing the result.
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queriable through Interactive Queries.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
@ -950,7 +1140,9 @@ public interface KGroupedStream<K, V> { @@ -950,7 +1140,9 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde);
@ -999,7 +1191,9 @@ public interface KGroupedStream<K, V> { @@ -999,7 +1191,9 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final StateStoreSupplier<KeyValueStore> storeSupplier);

76
streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java

@ -0,0 +1,76 @@ @@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collections;
import java.util.Set;
class GroupedStreamAggregateBuilder<K, V> {
private final InternalStreamsBuilder builder;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final boolean repartitionRequired;
private final Set<String> sourceNodes;
private final String name;
GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final boolean repartitionRequired,
final Set<String> sourceNodes,
final String name) {
this.builder = builder;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.repartitionRequired = repartitionRequired;
this.sourceNodes = sourceNodes;
this.name = name;
}
<T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
final StoreBuilder storeBuilder,
final boolean isQueryable) {
final String aggFunctionName = builder.newName(functionName);
final String sourceName = repartitionIfRequired(storeBuilder.name());
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
return new KTableImpl<>(
builder,
aggFunctionName,
aggregateSupplier,
sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
storeBuilder.name(),
isQueryable);
}
/**
* @return the new sourceName if repartitioned. Otherwise the name of this stream
*/
private String repartitionIfRequired(final String queryableStoreName) {
if (!repartitionRequired) {
return this.name;
}
return KStreamImpl.createReparitionedSource(builder, keySerde, valueSerde, queryableStoreName, name);
}
}

127
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java

@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals; @@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
@ -32,6 +34,7 @@ import org.apache.kafka.streams.kstream.Windows; @@ -32,6 +34,7 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Collections;
@ -46,6 +49,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -46,6 +49,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private final boolean repartitionRequired;
private final Initializer<Long> countInitializer = new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
};
private final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
};
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
private boolean isQueryable = true;
KGroupedStreamImpl(final InternalStreamsBuilder builder,
@ -55,6 +71,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -55,6 +71,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Serde<V> valSerde,
final boolean repartitionRequired) {
super(builder, name, sourceNodes);
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder,
keySerde,
valSerde,
repartitionRequired,
sourceNodes,
name);
this.keySerde = keySerde;
this.valSerde = valSerde;
this.repartitionRequired = repartitionRequired;
@ -91,6 +113,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -91,6 +113,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
return doAggregate(
new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
REDUCE_NAME,
materializedInternal);
}
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@ -130,6 +165,41 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -130,6 +165,41 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
return aggregateMaterialized(initializer, aggregator, materialized);
}
private <VR> KTable<K, VR> aggregateMaterialized(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
materializedInternal);
}
@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
final String storeName = builder.newStoreName(AGGREGATE_NAME);
MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false);
return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
materializedInternal);
}
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@ -198,17 +268,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -198,17 +268,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
return aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
}, new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
}, storeSupplier);
return aggregate(countInitializer, countAggregator, storeSupplier);
}
@Override
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(countInitializer, countAggregator, materialized);
}
@Override
@ -227,17 +292,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -227,17 +292,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier) {
return aggregate(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
}, new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
},
countInitializer,
countAggregator,
windows,
storeSupplier);
}
@ -320,18 +376,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -320,18 +376,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final Initializer<Long> initializer = new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
};
final Aggregator<K, V, Long> aggregator = new Aggregator<K, V, Long>() {
@Override
public Long apply(final K aggKey, final V value, final Long aggregate) {
return aggregate + 1;
}
};
final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
@Override
public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
@ -339,7 +383,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -339,7 +383,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
};
return aggregate(initializer, aggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
}
@ -397,6 +441,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -397,6 +441,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, sessionMerger, sessionWindows, valSerde, storeSupplier);
}
private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
final StoreBuilder<KeyValueStore<K, T>> storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal)
.materialize();
return aggregateBuilder.build(aggregateSupplier, functionName, storeBuilder, materializedInternal.isQueryable());
}
private <T> KTable<K, T> doAggregate(
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
@ -426,6 +481,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -426,6 +481,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
if (!repartitionRequired) {
return this.name;
}
return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
return KStreamImpl.createReparitionedSource(builder, keySerde, valSerde, queryableStoreName, name);
}
}

25
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -605,37 +605,38 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -605,37 +605,38 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
*/
private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
final Serde<V> valSerde) {
String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, null);
String repartitionedSourceName = createReparitionedSource(builder, keySerde, valSerde, null, name);
return new KStreamImpl<>(builder, repartitionedSourceName, Collections
.singleton(repartitionedSourceName), false);
}
static <K1, V1> String createReparitionedSource(final AbstractStream<K1> stream,
static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
final Serde<K1> keySerde,
final Serde<V1> valSerde,
final String topicNamePrefix) {
final String topicNamePrefix,
final String name) {
Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;
String baseName = topicNamePrefix != null ? topicNamePrefix : name;
String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
String sinkName = stream.builder.newName(SINK_NAME);
String filterName = stream.builder.newName(FILTER_NAME);
String sourceName = stream.builder.newName(SOURCE_NAME);
String sinkName = builder.newName(SINK_NAME);
String filterName = builder.newName(FILTER_NAME);
String sourceName = builder.newName(SOURCE_NAME);
stream.builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
stream.builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
@Override
public boolean test(final K1 key, final V1 value) {
return key != null;
}
}, false), stream.name);
}, false), name);
stream.builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
null, filterName);
stream.builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
keyDeserializer, valDeserializer, repartitionTopic);
return sourceName;

13
streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java

@ -25,8 +25,15 @@ import java.util.Map; @@ -25,8 +25,15 @@ import java.util.Map;
public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
private final boolean queryable;
public MaterializedInternal(final Materialized<K, V, S> materialized) {
this(materialized, true);
}
MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) {
super(materialized);
this.queryable = queryable;
}
public String storeName() {
@ -56,7 +63,11 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ @@ -56,7 +63,11 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
return topicConfig;
}
public boolean cachingEnabled() {
boolean cachingEnabled() {
return cachingEnabled;
}
public boolean isQueryable() {
return queryable;
}
}

57
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java

@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder; @@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
@ -40,9 +39,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDU @@ -40,9 +39,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDU
public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
private final Windows<W> windows;
private final boolean repartitionRequired;
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
WindowedKStreamImpl(final Windows<W> windows,
final InternalStreamsBuilder builder,
@ -55,8 +54,8 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< @@ -55,8 +54,8 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
Objects.requireNonNull(windows, "windows can't be null");
this.valSerde = valSerde;
this.keySerde = keySerde;
this.repartitionRequired = repartitionRequired;
this.windows = windows;
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
}
@Override
@ -76,38 +75,34 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< @@ -76,38 +75,34 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
Serdes.Long());
}
@SuppressWarnings("unchecked")
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
final String aggFunctionName = builder.newName(AGGREGATE_NAME);
final String storeName = builder.newStoreName(AGGREGATE_NAME);
return doAggregate(aggValueSerde,
aggFunctionName,
storeName,
new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator));
return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
AGGREGATE_NAME,
windowStoreBuilder(storeName, aggValueSerde),
false);
}
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
Objects.requireNonNull(reducer, "reducer can't be null");
final String storeName = builder.newStoreName(REDUCE_NAME);
return doAggregate(valSerde,
builder.newName(REDUCE_NAME),
storeName,
new KStreamWindowReduce<>(windows, storeName, reducer));
return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
REDUCE_NAME,
windowStoreBuilder(storeName, valSerde),
false);
}
@SuppressWarnings("unchecked")
private <VR> KTable<Windowed<K>, VR> doAggregate(final Serde<VR> aggValueSerde,
final String aggFunctionName,
final String storeName,
final KStreamAggProcessorSupplier aggSupplier) {
final String sourceName = repartitionIfRequired(storeName);
final StoreBuilder<WindowStore<K, VR>> storeBuilder = Stores.windowStoreBuilder(
private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore(
storeName,
windows.maintainMs(),
@ -115,29 +110,7 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< @@ -115,29 +110,7 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
windows.size(),
false),
keySerde,
aggValueSerde)
.withCachingEnabled();
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName);
builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
return new KTableImpl<>(
builder,
aggFunctionName,
aggSupplier,
sourceName.equals(this.name) ? sourceNodes
: Collections.singleton(sourceName),
storeName,
false);
aggValueSerde).withCachingEnabled();
}
/**
* @return the new sourceName if repartitioned. Otherwise the name of this stream
*/
private String repartitionIfRequired(final String queryableStoreName) {
if (!repartitionRequired) {
return this.name;
}
return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
}
}

106
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer; @@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
@ -486,6 +488,110 @@ public class KGroupedStreamImplTest { @@ -486,6 +488,110 @@ public class KGroupedStreamImplTest {
groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
groupedStream.count((Materialized) null);
}
@SuppressWarnings("unchecked")
@Test
public void shouldCountAndMaterializeResults() {
groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
processData();
final KeyValueStore<String, Long> count = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
assertThat(count.get("1"), equalTo(3L));
assertThat(count.get("2"), equalTo(1L));
assertThat(count.get("3"), equalTo(2L));
}
@SuppressWarnings("unchecked")
@Test
public void shouldReduceAndMaterializeResults() {
groupedStream.reduce(MockReducer.STRING_ADDER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
processData();
final KeyValueStore<String, String> reduced = (KeyValueStore<String, String>) driver.allStateStores().get("reduce");
assertThat(reduced.get("1"), equalTo("A+C+D"));
assertThat(reduced.get("2"), equalTo("B"));
assertThat(reduced.get("3"), equalTo("E+F"));
}
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
groupedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
processData();
final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
assertThat(aggregate.get("2"), equalTo("0+B"));
assertThat(aggregate.get("3"), equalTo("0+E+F"));
}
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateWithDefaultSerdes() {
final Map<String, String> results = new HashMap<>();
groupedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER)
.toStream()
.foreach(new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
results.put(key, value);
}
});
processData();
assertThat(results.get("1"), equalTo("0+A+C+D"));
assertThat(results.get("2"), equalTo("0+B"));
assertThat(results.get("3"), equalTo("0+E+F"));
}
private void processData() {
driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
driver.setTime(0);
driver.process(TOPIC, "1", "A");
driver.process(TOPIC, "2", "B");
driver.process(TOPIC, "1", "C");
driver.process(TOPIC, "1", "D");
driver.process(TOPIC, "3", "E");
driver.process(TOPIC, "3", "F");
driver.flushState();
}
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
driver.setUp(builder, TestUtils.tempDirectory(), 0);
driver.setTime(0);

Loading…
Cancel
Save