Browse Source

KAFKA-5921; add Materialized overloads to windowed kstream

Add `Materialized` overloads to `WindowedKStream`. Deprecate existing methods on `KGroupedStream`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3889 from dguy/kafka-5921
pull/3892/merge
Damian Guy 7 years ago
parent
commit
c8f1471992
  1. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
  2. 150
      streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
  3. 15
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
  4. 32
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
  5. 95
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
  6. 6
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
  7. 109
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java

8
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java

@ -667,7 +667,9 @@ public interface KGroupedStream<K, V> { @@ -667,7 +667,9 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
final String queryableStoreName);
@ -772,7 +774,9 @@ public interface KGroupedStream<K, V> { @@ -772,7 +774,9 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier);
@ -1259,7 +1263,9 @@ public interface KGroupedStream<K, V> { @@ -1259,7 +1263,9 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
@ -1369,7 +1375,9 @@ public interface KGroupedStream<K, V> { @@ -1369,7 +1375,9 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,

150
streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java

@ -16,11 +16,13 @@ @@ -16,11 +16,13 @@
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.WindowStore;
/**
* {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
@ -31,6 +33,14 @@ import org.apache.kafka.streams.state.KeyValueStore; @@ -31,6 +33,14 @@ import org.apache.kafka.streams.state.KeyValueStore;
* new (partitioned) windows resulting in a windowed {@link KTable}
* (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
* <p>
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
* materialized view) that can be queried using the name provided in the {@link Materialized} instance.
* Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
*
* @param <K> Type of keys
@ -43,13 +53,6 @@ public interface WindowedKStream<K, V> { @@ -43,13 +53,6 @@ public interface WindowedKStream<K, V> {
/**
* Count the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
* materialized view) that can be queried using the provided {@code queryableName}.
* Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@ -70,6 +73,38 @@ public interface WindowedKStream<K, V> { @@ -70,6 +73,38 @@ public interface WindowedKStream<K, V> {
*/
KTable<Windowed<K>, Long> count();
/**
* Count the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
* <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
*
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
*
* String key = "some-word";
* long fromTime = ...;
* long toTime = ...;
* WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
@ -84,9 +119,12 @@ public interface WindowedKStream<K, V> { @@ -84,9 +119,12 @@ public interface WindowedKStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Serde)} can be used to compute aggregate functions like
* Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}).
* <p>
* The default value serde from config will be used for serializing the result.
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
@ -102,17 +140,63 @@ public interface WindowedKStream<K, V> { @@ -102,17 +140,63 @@ public interface WindowedKStream<K, V> {
* Note that the internal store name may not be queriable through Interactive Queries.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param <VR> the value type of the resulting {@link KTable}
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
* provide an initial intermediate aggregation result that is used to process the first record.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}).
* <p>
* <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
*
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
*
* String key = "some-word";
* long fromTime = ...;
* long toTime = ...;
* WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde);
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
/**
* Combine the values of records in this stream by the grouped key.
@ -147,4 +231,46 @@ public interface WindowedKStream<K, V> { @@ -147,4 +231,46 @@ public interface WindowedKStream<K, V> {
* latest (rolling) aggregate for each key
*/
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
/**
* Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value.
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
* Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
*
* String key = "some-word";
* long fromTime = ...;
* long toTime = ...;
* WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
*
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
}

15
streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.StoreBuilder;
@ -31,6 +33,19 @@ class GroupedStreamAggregateBuilder<K, V> { @@ -31,6 +33,19 @@ class GroupedStreamAggregateBuilder<K, V> {
private final Set<String> sourceNodes;
private final String name;
final Initializer<Long> countInitializer = new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
};
final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
};
GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
final Serde<K> keySerde,
final Serde<V> valueSerde,

32
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java

@ -49,18 +49,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -49,18 +49,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private final boolean repartitionRequired;
private final Initializer<Long> countInitializer = new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
};
private final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
};
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
private boolean isQueryable = true;
@ -235,7 +223,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -235,7 +223,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
final Serde<T> aggValueSerde) {
return windowedBy(windows).aggregate(initializer, aggregator, aggValueSerde);
return windowedBy(windows).aggregate(initializer, aggregator,
Materialized.<K, T, WindowStore<Bytes, byte[]>>as(builder.newStoreName(AGGREGATE_NAME))
.withKeySerde(keySerde)
.withValueSerde(aggValueSerde));
}
@SuppressWarnings("unchecked")
@ -268,12 +259,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -268,12 +259,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
return aggregate(countInitializer, countAggregator, storeSupplier);
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, storeSupplier);
}
@Override
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(countInitializer, countAggregator, materialized);
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
}
@Override
@ -292,8 +283,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -292,8 +283,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier) {
return aggregate(
countInitializer,
countAggregator,
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
windows,
storeSupplier);
}
@ -383,7 +374,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @@ -383,7 +374,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
};
return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
return aggregate(aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
sessionMerger,
sessionWindows,
Serdes.Long(),
storeSupplier);
}

95
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java

@ -18,9 +18,11 @@ package org.apache.kafka.streams.kstream.internals; @@ -18,9 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
@ -28,6 +30,7 @@ import org.apache.kafka.streams.kstream.WindowedKStream; @@ -28,6 +30,7 @@ import org.apache.kafka.streams.kstream.WindowedKStream;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Objects;
@ -60,35 +63,56 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< @@ -60,35 +63,56 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
@Override
public KTable<Windowed<K>, Long> count() {
return aggregate(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
}, new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
},
return doAggregate(
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
Serdes.Long());
}
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
}
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde) {
final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
return doAggregate(initializer, aggregator, null);
}
@SuppressWarnings("unchecked")
private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> serde) {
final String storeName = builder.newStoreName(AGGREGATE_NAME);
return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
AGGREGATE_NAME,
windowStoreBuilder(storeName, aggValueSerde),
windowStoreBuilder(storeName, serde),
false);
}
@SuppressWarnings("unchecked")
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
materializedInternal.storeName(),
initializer,
aggregator),
AGGREGATE_NAME,
materialize(materializedInternal),
true);
}
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
@ -97,10 +121,49 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< @@ -97,10 +121,49 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
REDUCE_NAME,
windowStoreBuilder(storeName, valSerde),
true);
}
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
REDUCE_NAME,
materialize(materializedInternal),
false);
}
private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
supplier = Stores.persistentWindowStore(materialized.storeName(),
windows.maintainMs(),
windows.segments,
windows.size(),
false);
}
final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
materialized.keySerde(),
materialized.valueSerde());
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());
} else {
builder.withLoggingDisabled();
}
if (materialized.cachingEnabled()) {
builder.withCachingEnabled();
}
return builder;
}
private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore(

6
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

@ -110,6 +110,8 @@ public class KStreamAggregationIntegrationTest { @@ -110,6 +110,8 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
@ -306,8 +308,8 @@ public class KStreamAggregationIntegrationTest { @@ -306,8 +308,8 @@ public class KStreamAggregationIntegrationTest {
groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate(
initializer,
aggregator,
Serdes.Integer())
aggregator
)
.toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
@Override
public String apply(final Windowed<String> windowedKey, final Integer value) {

109
streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java

@ -18,24 +18,31 @@ @@ -18,24 +18,31 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedKStream;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
@ -76,7 +83,6 @@ public class WindowedKStreamImplTest { @@ -76,7 +83,6 @@ public class WindowedKStreamImplTest {
}
@Test
public void shouldReduceWindowed() {
final Map<Windowed<String>, String> results = new HashMap<>();
@ -99,8 +105,8 @@ public class WindowedKStreamImplTest { @@ -99,8 +105,8 @@ public class WindowedKStreamImplTest {
public void shouldAggregateWindowed() {
final Map<Windowed<String>, String> results = new HashMap<>();
windowedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Serdes.String())
MockAggregator.TOSTRING_ADDER
)
.toStream()
.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
@ -114,14 +120,66 @@ public class WindowedKStreamImplTest { @@ -114,14 +120,66 @@ public class WindowedKStreamImplTest {
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
}
@SuppressWarnings("unchecked")
@Test
public void shouldMaterializeCount() {
windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
processData();
final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
}
@SuppressWarnings("unchecked")
@Test
public void shouldMaterializeReduced() {
windowedStream.reduce(MockReducer.STRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
processData();
final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
}
@SuppressWarnings("unchecked")
@Test
public void shouldMaterializeAggregated() {
windowedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
processData();
final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String());
windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
windowedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String());
windowedStream.aggregate(MockInitializer.STRING_INIT, null);
}
@Test(expected = NullPointerException.class)
@ -129,8 +187,47 @@ public class WindowedKStreamImplTest { @@ -129,8 +187,47 @@ public class WindowedKStreamImplTest {
windowedStream.reduce(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
windowedStream.aggregate(null,
MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
windowedStream.aggregate(MockInitializer.STRING_INIT,
null,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
windowedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
(Materialized) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
windowedStream.reduce(null,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
windowedStream.reduce(MockReducer.STRING_ADDER,
null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
windowedStream.count(null);
}
private void processData() {
driver.setUp(builder, TestUtils.tempDirectory());
driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
driver.setTime(10);
driver.process(TOPIC, "1", "1");
driver.setTime(15);

Loading…
Cancel
Save