Browse Source

KAFKA-4144 Follow-up: add one missing overload function to maintain backward compatibility

A follow up RP to fix [issue](2cd0b87bc8 (commitcomment-22200864))

Author: Jeyhun Karimov <je.karimov@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Bill Bejeck, Guozhang Wang

Closes #3109 from jeyhunkarimov/KAFKA-4144-follow-up
pull/3133/head
Jeyhun Karimov 8 years ago committed by Guozhang Wang
parent
commit
c5d44af774
  1. 37
      streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java

37
streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java

@ -1089,6 +1089,43 @@ public class KStreamBuilder extends TopologyBuilder {
return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier); return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
} }
/**
* Create a {@link GlobalKTable} for the specified topic.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
* {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @param queryableStoreName the state store name; If {@code null} this is the equivalent of
* {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
* @return a {@link GlobalKTable} for the specified topic
*/
@SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
final String queryableStoreName) {
return globalTable(keySerde, valSerde, null, topic, queryableStoreName);
}
/** /**
* Create a {@link GlobalKTable} for the specified topic. * Create a {@link GlobalKTable} for the specified topic.

Loading…
Cancel
Save