Browse Source

KAFKA-5931; deprecate KTable#through and KTable#to

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3903 from dguy/deprectate-to-through
pull/3903/merge
Damian Guy 7 years ago
parent
commit
37ec15e962
  1. 84
      streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java

84
streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java

@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
* <p>
* A {@code KTable} is either {@link StreamsBuilder#table(String, String) defined from a single Kafka topic} that is
* A {@code KTable} is either {@link StreamsBuilder#table(String) defined from a single Kafka topic} that is
* consumed message by message or the result of a {@code KTable} transformation.
* An aggregation of a {@link KStream} also yields a {@code KTable}.
* <p>
@ -66,7 +66,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -66,7 +66,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* @see KStream
* @see KGroupedTable
* @see GlobalKTable
* @see StreamsBuilder#table(String, String)
* @see StreamsBuilder#table(String)
*/
@InterfaceStability.Evolving
public interface KTable<K, V> {
@ -763,17 +763,20 @@ public interface KTable<K, V> { @@ -763,17 +763,20 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
* @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final String topic,
final String queryableStoreName);
@ -784,16 +787,19 @@ public interface KTable<K, V> { @@ -784,16 +787,19 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier);
@ -804,14 +810,17 @@ public interface KTable<K, V> { @@ -804,14 +810,17 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
* {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
* {@link StreamsBuilder#table(String)})
*
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final String topic);
/**
@ -822,7 +831,7 @@ public interface KTable<K, V> { @@ -822,7 +831,7 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
* {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
* {@link StreamsBuilder#table(String)})
@ -831,7 +840,10 @@ public interface KTable<K, V> { @@ -831,7 +840,10 @@ public interface KTable<K, V> {
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
@ -843,10 +855,10 @@ public interface KTable<K, V> { @@ -843,10 +855,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
@ -854,7 +866,10 @@ public interface KTable<K, V> { @@ -854,7 +866,10 @@ public interface KTable<K, V> {
* @param queryableStoreName the state store name used for the result {@code KTable}.
* If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
final String queryableStoreName);
@ -867,17 +882,20 @@ public interface KTable<K, V> { @@ -867,17 +882,20 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier);
@ -891,10 +909,10 @@ public interface KTable<K, V> { @@ -891,10 +909,10 @@ public interface KTable<K, V> {
* used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@ -904,7 +922,10 @@ public interface KTable<K, V> { @@ -904,7 +922,10 @@ public interface KTable<K, V> {
* @param queryableStoreName the state store name used for the result {@code KTable}.
* If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
final String topic,
final String queryableStoreName);
@ -918,10 +939,10 @@ public interface KTable<K, V> { @@ -918,10 +939,10 @@ public interface KTable<K, V> {
* used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@ -930,7 +951,10 @@ public interface KTable<K, V> { @@ -930,7 +951,10 @@ public interface KTable<K, V> {
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier);
@ -955,7 +979,10 @@ public interface KTable<K, V> { @@ -955,7 +979,10 @@ public interface KTable<K, V> {
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
final String topic);
@ -967,10 +994,10 @@ public interface KTable<K, V> { @@ -967,10 +994,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@ -984,7 +1011,10 @@ public interface KTable<K, V> { @@ -984,7 +1011,10 @@ public interface KTable<K, V> {
* @param queryableStoreName the state store name used for the result {@code KTable}.
* If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
@ -999,10 +1029,10 @@ public interface KTable<K, V> { @@ -999,10 +1029,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
* {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, String)})
* {@link StreamsBuilder#table(String, Materialized)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@ -1015,7 +1045,10 @@ public interface KTable<K, V> { @@ -1015,7 +1045,10 @@ public interface KTable<K, V> {
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
@ -1045,7 +1078,10 @@ public interface KTable<K, V> { @@ -1045,7 +1078,10 @@ public interface KTable<K, V> {
* be used
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
* and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
@ -1058,7 +1094,9 @@ public interface KTable<K, V> { @@ -1058,7 +1094,9 @@ public interface KTable<K, V> {
* started).
*
* @param topic the topic name
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
*/
@Deprecated
void to(final String topic);
/**
@ -1070,7 +1108,9 @@ public interface KTable<K, V> { @@ -1070,7 +1108,9 @@ public interface KTable<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
*/
@Deprecated
void to(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
@ -1087,7 +1127,9 @@ public interface KTable<K, V> { @@ -1087,7 +1127,9 @@ public interface KTable<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
*/
@Deprecated
void to(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic);
@ -1107,7 +1149,9 @@ public interface KTable<K, V> { @@ -1107,7 +1149,9 @@ public interface KTable<K, V> {
* {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
* be used
* @param topic the topic name
* @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
*/
@Deprecated
void to(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,

Loading…
Cancel
Save