Browse Source

KAFKA-5655; materialized count, aggregate, reduce to KGroupedTable

Add overloads of `count`, `aggregate`, `reduce` using `Materialized` to `KGroupedTable`
deprecate other overloads

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3829 from dguy/kafka-5655
pull/3829/merge
Damian Guy 7 years ago
parent
commit
8bd2a68b50
  1. 204
      streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
  2. 12
      streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
  3. 134
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
  4. 54
      streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
  5. 137
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
  6. 1
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java

204
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@ -80,9 +81,52 @@ public interface KGroupedTable<K, V> { @@ -80,9 +81,52 @@ public interface KGroupedTable<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @deprecated use {@link #count(Materialized)}
*/
@Deprecated
KTable<K, Long> count(final String queryableStoreName);
/**
* Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
* the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
* '.', '_' and '-'.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null}
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
* the same key into a new instance of {@link KTable}.
@ -148,7 +192,9 @@ public interface KGroupedTable<K, V> { @@ -148,7 +192,9 @@ public interface KGroupedTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @deprecated use {@link #count(Materialized)}
*/
@Deprecated
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@ -218,11 +264,83 @@ public interface KGroupedTable<K, V> { @@ -218,11 +264,83 @@ public interface KGroupedTable<K, V> {
* '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final String queryableStoreName);
/**
* Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
* The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the
* current aggregate (first argument) and the record's value (second argument) by adding the new record to the
* aggregate.
* The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable}
* and computes a new aggregate using the current aggregate (first argument) and the record's value (second
* argument) by "removing" the "replaced" record from the aggregate.
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
* Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum.
* For sum, the adder and substractor would work as follows:
* <pre>{@code
* public class SumAdder implements Reducer<Integer> {
* public Integer apply(Integer currentAgg, Integer newValue) {
* return currentAgg + newValue;
* }
* }
*
* public class SumSubtractor implements Reducer<Integer> {
* public Integer apply(Integer currentAgg, Integer oldValue) {
* return currentAgg - oldValue;
* }
* }
* }</pre>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
* '.', '_' and '-'.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param adder a {@link Reducer} that adds a new value to the aggregate result
* @param subtractor a {@link Reducer} that removed an old value from the aggregate result
* @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable}.
@ -344,7 +462,9 @@ public interface KGroupedTable<K, V> { @@ -344,7 +462,9 @@ public interface KGroupedTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
@ -427,12 +547,94 @@ public interface KGroupedTable<K, V> { @@ -427,12 +547,94 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final String queryableStoreName);
/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
* Records with {@code null} key are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
* for example, allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
* provide an initial intermediate aggregation result that is used to process the first record.
* Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
* The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the
* current aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value by adding the new record to the aggregate.
* The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable}
* and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
* record from the aggregate.
* Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions
* like sum.
* For sum, the initializer, adder, and substractor would work as follows:
* <pre>{@code
* // in this example, LongSerde.class must be set as default value serde in StreamsConfig
* public class SumInitializer implements Initializer<Long> {
* public Long apply() {
* return 0L;
* }
* }
*
* public class SumAdder implements Aggregator<String, Integer, Long> {
* public Long apply(String key, Integer newValue, Long aggregate) {
* return aggregate + newValue;
* }
* }
*
* public class SumSubstractor implements Aggregator<String, Integer, Long> {
* public Long apply(String key, Integer oldValue, Long aggregate) {
* return aggregate - oldValue;
* }
* }
* }</pre>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer an {@link Initializer} that provides an initial aggregate result value
* @param adder an {@link Aggregator} that adds a new record to the aggregate result
* @param subtractor an {@link Aggregator} that removed an old record from the aggregate result
* @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null}
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
@ -582,7 +784,9 @@ public interface KGroupedTable<K, V> { @@ -582,7 +784,9 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,

12
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
@ -29,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStore; @@ -29,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStore;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Used to describe how a {@link StateStore} should be materialized.
@ -70,13 +72,15 @@ public class Materialized<K, V, S extends StateStore> { @@ -70,13 +72,15 @@ public class Materialized<K, V, S extends StateStore> {
/**
* Materialize a {@link StateStore} with the given name.
*
* @param storeName name of the store to materialize
* @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
* alphanumerics, '.', '_' and '-'.
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
Topic.validate(storeName);
return new Materialized<>(storeName);
}
@ -89,6 +93,7 @@ public class Materialized<K, V, S extends StateStore> { @@ -89,6 +93,7 @@ public class Materialized<K, V, S extends StateStore> {
* @return a new {@link Materialized} instance with the given supplier
*/
public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) {
Objects.requireNonNull(supplier, "supplier can't be null");
return new Materialized<>(supplier);
}
@ -98,9 +103,11 @@ public class Materialized<K, V, S extends StateStore> { @@ -98,9 +103,11 @@ public class Materialized<K, V, S extends StateStore> {
* @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
* @param <K> key type of the store
* @param <V> value type of the store
* @return a new {@link Materialized} instance with the given supplier
* @return a new {@link Materialized} instance with the given sup
* plier
*/
public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) {
Objects.requireNonNull(supplier, "supplier can't be null");
return new Materialized<>(supplier);
}
@ -113,6 +120,7 @@ public class Materialized<K, V, S extends StateStore> { @@ -113,6 +120,7 @@ public class Materialized<K, V, S extends StateStore> {
* @return a new {@link Materialized} instance with the given supplier
*/
public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier) {
Objects.requireNonNull(supplier, "supplier can't be null");
return new Materialized<>(supplier);
}

134
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java

@ -20,6 +20,8 @@ import org.apache.kafka.common.serialization.Deserializer; @@ -20,6 +20,8 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Initializer;
@ -48,6 +50,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @@ -48,6 +50,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
protected final Serde<? extends K> keySerde;
protected final Serde<? extends V> valSerde;
private boolean isQueryable = true;
private final Initializer<Long> countInitializer = new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
};
private final Aggregator<K, V, Long> countAdder = new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
};
private Aggregator<K, V, Long> countSubtractor = new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate - 1L;
}
};
public KGroupedTableImpl(final InternalStreamsBuilder builder,
final String name,
@ -116,19 +138,33 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @@ -116,19 +138,33 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
String sinkName = builder.newName(KStreamImpl.SINK_NAME);
String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
String funcName = builder.newName(functionName);
final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newName(functionName);
buildAggregate(aggregateSupplier,
storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
funcName,
sourceName,
sinkName);
builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
}
Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String topic,
final String funcName,
final String sourceName,
final String sinkName) {
final Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
final Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
final Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
final Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
final ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
final ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
// send the aggregate key-value pairs to the intermediate topic for partitioning
builder.internalTopologyBuilder.addInternalTopic(topic);
@ -139,10 +175,23 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @@ -139,10 +175,23 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
// aggregate the values with the aggregator and local store
builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName);
builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
}
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newName(functionName);
buildAggregate(aggregateSupplier,
materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
funcName,
sourceName, sinkName);
builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), funcName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
}
@Override
@ -153,6 +202,21 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @@ -153,6 +202,21 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, getOrCreateName(queryableStoreName, REDUCE_NAME)));
}
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(),
adder,
subtractor);
return doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal);
}
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor) {
@ -176,6 +240,32 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @@ -176,6 +240,32 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return count(keyValueStore(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
@Override
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(countInitializer,
countAdder,
countSubtractor,
materialized);
}
@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized);
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
initializer,
adder,
subtractor);
return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
}
@Override
public KTable<K, Long> count() {
return count((String) null);
@ -184,23 +274,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @@ -184,23 +274,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
@Override
public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
return this.aggregate(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
},
new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
}, new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate - 1L;
}
},
countInitializer,
countAdder,
countSubtractor,
storeSupplier);
}

54
streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.junit.Test;
public class MaterializedTest {
@Test
public void shouldAllowValidTopicNamesAsStoreName() {
Materialized.as("valid-name");
Materialized.as("valid.name");
Materialized.as("valid_name");
}
@Test(expected = InvalidTopicException.class)
public void shouldNotAllowInvalidTopicNames() {
Materialized.as("not:valid");
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfWindowBytesStoreSupplierIsNull() {
Materialized.as((WindowBytesStoreSupplier) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfKeyValueBytesStoreSupplierIsNull() {
Materialized.as((KeyValueBytesStoreSupplier) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfSessionBytesStoreSupplierIsNull() {
Materialized.as((SessionBytesStoreSupplier) null);
}
}

137
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java

@ -18,12 +18,15 @@ package org.apache.kafka.streams.kstream.internals; @@ -18,12 +18,15 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
@ -39,10 +42,13 @@ import org.junit.Test; @@ -39,10 +42,13 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@SuppressWarnings("deprecation")
public class KGroupedTableImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
@ -50,6 +56,7 @@ public class KGroupedTableImplTest { @@ -50,6 +56,7 @@ public class KGroupedTableImplTest {
private KGroupedTable<String, String> groupedTable;
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private final String topic = "input";
@Before
public void before() {
@ -142,7 +149,6 @@ public class KGroupedTableImplTest { @@ -142,7 +149,6 @@ public class KGroupedTableImplTest {
@Test
public void shouldReduce() {
final String topic = "input";
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@Override
@ -161,7 +167,6 @@ public class KGroupedTableImplTest { @@ -161,7 +167,6 @@ public class KGroupedTableImplTest {
@Test
public void shouldReduceWithInternalStoreName() {
final String topic = "input";
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@Override
@ -177,4 +182,132 @@ public class KGroupedTableImplTest { @@ -177,4 +182,132 @@ public class KGroupedTableImplTest {
doShouldReduce(reduced, topic);
assertNull(reduced.queryableStoreName());
}
@SuppressWarnings("unchecked")
@Test
public void shouldReduceAndMaterializeResults() {
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(String key, Number value) {
return KeyValue.pair(key, value.intValue());
}
};
final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER,
MockReducer.INTEGER_SUBTRACTOR,
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduce")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer()));
doShouldReduce(reduced, topic);
final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.allStateStores().get("reduce");
assertThat(reduce.get("A"), equalTo(5));
assertThat(reduce.get("B"), equalTo(6));
}
@SuppressWarnings("unchecked")
@Test
public void shouldCountAndMaterializeResults() {
final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
Serialized.with(Serdes.String(),
Serdes.String()))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
processData(topic);
final KeyValueStore<String, Long> counts = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
assertThat(counts.get("1"), equalTo(3L));
assertThat(counts.get("2"), equalTo(2L));
}
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
Serialized.with(Serdes.String(),
Serdes.String()))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String()));
processData(topic);
final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
assertThat(aggregate.get("2"), equalTo("0+2+2"));
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointOnCountWhenMaterializedIsNull() {
groupedTable.count((Materialized) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (Materialized) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenAdderIsNull() {
groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenSubtractorIsNull() {
groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenInitializerIsNull() {
groupedTable.aggregate(null,
MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenAdderIsNull() {
groupedTable.aggregate(MockInitializer.STRING_INIT,
null,
MockAggregator.TOSTRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenSubtractorIsNull() {
groupedTable.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
null,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
groupedTable.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER,
(Materialized) null);
}
private void processData(final String topic) {
driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
driver.setTime(0L);
driver.process(topic, "A", "1");
driver.process(topic, "B", "1");
driver.process(topic, "C", "1");
driver.process(topic, "D", "2");
driver.process(topic, "E", "2");
driver.flushState();
}
}

1
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java

@ -405,4 +405,5 @@ public class KTableAggregateTest { @@ -405,4 +405,5 @@ public class KTableAggregateTest {
driver.process("tableOne", "1", "5");
assertEquals(Long.valueOf(4L), reduceResults.get("2"));
}
}

Loading…
Cancel
Save