Browse Source

KAFKA-4163: NPE in StreamsMetadataState during re-balance operations

During rebalance operations the Cluster object gets set to Cluster.empty(). This can result in NPEs when doing certain operation on StreamsMetadataState. This should throw a StreamsException if the Cluster is empty as it is not yet (re-)initialized

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1845 from dguy/streams-meta-hotfix
pull/1845/merge
Damian Guy 8 years ago committed by Guozhang Wang
parent
commit
3d74196f20
  1. 8
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
  3. 9
      streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
  4. 24
      streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
  5. 15
      streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
  6. 2
      streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
  7. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
  8. 3
      streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
  9. 45
      streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
  10. 7
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
  11. 21
      streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
  12. 19
      streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
  13. 3
      streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
  14. 11
      streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
  15. 5
      streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
  16. 13
      streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java

8
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -329,7 +329,8 @@ public class KafkaStreams { @@ -329,7 +329,8 @@ public class KafkaStreams {
* @param key Key to use to for partition
* @param keySerializer Serializer for the key
* @param <K> key type
* @return The {@link StreamsMetadata} for the storeName and key
* @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE}
* if streams is (re-)initializing
*/
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
@ -350,7 +351,8 @@ public class KafkaStreams { @@ -350,7 +351,8 @@ public class KafkaStreams {
* @param key Key to use to for partition
* @param partitioner Partitioner for the store
* @param <K> key type
* @return The {@link StreamsMetadata} for the storeName and key
* @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE}
* if streams is (re-)initializing
*/
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
@ -368,6 +370,8 @@ public class KafkaStreams { @@ -368,6 +370,8 @@ public class KafkaStreams {
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param <T> return type
* @return A facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
* @throws org.apache.kafka.streams.errors.InvalidStateStoreException if the streams are (re-)initializing or
* a store with storeName and queryableStoreType doesnt' exist.
*/
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java

@ -70,6 +70,10 @@ public class StreamsMetadataState { @@ -70,6 +70,10 @@ public class StreamsMetadataState {
public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
Objects.requireNonNull(storeName, "storeName cannot be null");
if (!isInitialized()) {
return Collections.emptyList();
}
final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
if (sourceTopics == null) {
return Collections.emptyList();
@ -96,7 +100,8 @@ public class StreamsMetadataState { @@ -96,7 +100,8 @@ public class StreamsMetadataState {
* @param key Key to use
* @param keySerializer Serializer for the key
* @param <K> key type
* @return The {@link StreamsMetadata} for the storeName and key
* @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE}
* if streams is (re-)initializing
*/
public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
final K key,
@ -105,10 +110,15 @@ public class StreamsMetadataState { @@ -105,10 +110,15 @@ public class StreamsMetadataState {
Objects.requireNonNull(storeName, "storeName can't be null");
Objects.requireNonNull(key, "key can't be null");
if (!isInitialized()) {
return StreamsMetadata.NOT_AVAILABLE;
}
final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
if (sourceTopicsInfo == null) {
return null;
}
return getStreamsMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer,
@ -131,7 +141,8 @@ public class StreamsMetadataState { @@ -131,7 +141,8 @@ public class StreamsMetadataState {
* @param key Key to use
* @param partitioner partitioner to use to find correct partition for key
* @param <K> key type
* @return The {@link StreamsMetadata} for the storeName and key
* @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE}
* if streams is (re-)initializing
*/
public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
final K key,
@ -140,6 +151,10 @@ public class StreamsMetadataState { @@ -140,6 +151,10 @@ public class StreamsMetadataState {
Objects.requireNonNull(key, "key can't be null");
Objects.requireNonNull(partitioner, "partitioner can't be null");
if (!isInitialized()) {
return StreamsMetadata.NOT_AVAILABLE;
}
SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
if (sourceTopicsInfo == null) {
return null;
@ -218,6 +233,10 @@ public class StreamsMetadataState { @@ -218,6 +233,10 @@ public class StreamsMetadataState {
return new SourceTopicsInfo(sourceTopics);
}
private boolean isInitialized() {
return !clusterMetadata.topics().isEmpty();
}
private class SourceTopicsInfo {
private final Set<String> sourceTopics;
private int maxPartitions;

9
streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java

@ -19,6 +19,7 @@ package org.apache.kafka.streams.state; @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.KafkaStreams;
import java.util.Collections;
import java.util.Set;
/**
@ -29,6 +30,14 @@ import java.util.Set; @@ -29,6 +30,14 @@ import java.util.Set;
* NOTE: This is a point in time view. It may change when rebalances happen.
*/
public class StreamsMetadata {
/**
* Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
* operations.
*/
public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(new HostInfo("unavailable", -1),
Collections.<String>emptySet(),
Collections.<TopicPartition>emptySet());
private final HostInfo hostInfo;
private final Set<String> stateStoreNames;
private final Set<TopicPartition> topicPartitions;

24
streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java

@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -48,10 +49,15 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto @@ -48,10 +49,15 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
public V get(final K key) {
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
for (ReadOnlyKeyValueStore<K, V> store : stores) {
V result = store.get(key);
if (result != null) {
return result;
try {
final V result = store.get(key);
if (result != null) {
return result;
}
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
return null;
}
@ -61,7 +67,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto @@ -61,7 +67,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
return store.range(from, to);
try {
return store.range(from, to);
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
@ -73,7 +83,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto @@ -73,7 +83,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
return store.all();
try {
return store.all();
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);

15
streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java

@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@ -44,11 +45,15 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K @@ -44,11 +45,15 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
for (ReadOnlyWindowStore<K, V> windowStore : stores) {
final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
if (!result.hasNext()) {
result.close();
} else {
return result;
try {
final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
if (!result.hasNext()) {
result.close();
} else {
return result;
}
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
return new WindowStoreIterator<V>() {

2
streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java

@ -46,7 +46,7 @@ public class QueryableStoreProvider { @@ -46,7 +46,7 @@ public class QueryableStoreProvider {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("Store: " + storeName + " is currently not available");
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),

4
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java

@ -39,14 +39,14 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider { @@ -39,14 +39,14 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider {
@Override
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (!streamThread.isInitialized()) {
throw new InvalidStateStoreException("Store: " + storeName + " is currently not available as the stream thread has not (re-)initialized yet");
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
final List<T> stores = new ArrayList<>();
for (StreamTask streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("Store: " + storeName + " isn't isOpen");
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
stores.add((T) store);
}

3
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java

@ -48,8 +48,7 @@ public class WrappingStoreProvider implements StateStoreProvider { @@ -48,8 +48,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("Store " + storeName + " is currently "
+ "unavailable");
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
}

45
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

@ -74,8 +74,6 @@ import java.util.concurrent.TimeUnit; @@ -74,8 +74,6 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@RunWith(Parameterized.class)
public class QueryableStateIntegrationTest {
private static final int NUM_BROKERS = 1;
@ -265,24 +263,23 @@ public class QueryableStateIntegrationTest { @@ -265,24 +263,23 @@ public class QueryableStateIntegrationTest {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null) {
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyKeyValueStore<String, Long> store;
try {
store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
} catch (final InvalidStateStoreException e) {
// rebalance
return false;
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null) {
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
return store != null && store.get(key) != null;
} catch (final IllegalStateException e) {
// Kafka Streams instance may have closed but rebalance hasn't happened
return false;
}
} catch (final InvalidStateStoreException e) {
// rebalance
return false;
}
return store != null && store.get(key) != null;
}
}, 30000, "waiting for metadata, store and value to be non null");
}
@ -296,15 +293,15 @@ public class QueryableStateIntegrationTest { @@ -296,15 +293,15 @@ public class QueryableStateIntegrationTest {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null) {
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyWindowStore<String, Long> store;
try {
store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null) {
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
return store != null && store.fetch(key, from, to) != null;
} catch (final IllegalStateException e) {
// Kafka Streams instance may have closed but rebalance hasn't happened
return false;
@ -312,7 +309,7 @@ public class QueryableStateIntegrationTest { @@ -312,7 +309,7 @@ public class QueryableStateIntegrationTest {
// rebalance
return false;
}
return store != null && store.fetch(key, from, to) != null;
}
}, 30000, "waiting for metadata, store and value to be non null");
}

7
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java

@ -209,6 +209,13 @@ public class StreamsMetadataStateTest { @@ -209,6 +209,13 @@ public class StreamsMetadataStateTest {
assertEquals(expected, actual);
}
@Test
public void shouldReturnNotAvailableWhenClusterIsEmpty() throws Exception {
discovery.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), Cluster.empty());
final StreamsMetadata result = discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer());
assertEquals(StreamsMetadata.NOT_AVAILABLE, result);
}
@Test
public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception {
final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);

21
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java

@ -18,6 +18,7 @@ import org.apache.kafka.streams.KeyValue; @@ -18,6 +18,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.test.StateStoreProviderStub;
import org.junit.Before;
import org.junit.Test;
@ -42,8 +43,8 @@ public class CompositeReadOnlyKeyValueStoreTest { @@ -42,8 +43,8 @@ public class CompositeReadOnlyKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub();
stubProviderTwo = new StateStoreProviderStub();
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
stubProviderTwo = new StateStoreProviderStub(false);
stubOneUnderlying = newStoreInstance();
stubProviderOne.addStore(storeName, stubOneUnderlying);
@ -148,19 +149,19 @@ public class CompositeReadOnlyKeyValueStoreTest { @@ -148,19 +149,19 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnGet() throws Exception {
noStores().get("anything");
public void shouldThrowInvalidStoreExceptionDuringRebalance() throws Exception {
rebalancing().get("anything");
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnRange() throws Exception {
noStores().range("anything", "something");
public void shouldThrowInvalidStoreExceptionOnRangeDuringRebalance() throws Exception {
rebalancing().range("anything", "something");
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnAll() throws Exception {
noStores().all();
public void shouldThrowInvalidStoreExceptionOnAllDuringRebalance() throws Exception {
rebalancing().all();
}
@Test
@ -192,8 +193,8 @@ public class CompositeReadOnlyKeyValueStoreTest { @@ -192,8 +193,8 @@ public class CompositeReadOnlyKeyValueStoreTest {
assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries());
}
private CompositeReadOnlyKeyValueStore<Object, Object> noStores() {
return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.<StateStoreProvider>emptyList()),
private CompositeReadOnlyKeyValueStore<Object, Object> rebalancing() {
return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(new StateStoreProviderStub(true))),
QueryableStoreTypes.keyValueStore(), storeName);
}

19
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java

@ -15,8 +15,10 @@ @@ -15,8 +15,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.StateStoreProviderStub;
import org.junit.Before;
import org.junit.Test;
@ -42,8 +44,8 @@ public class CompositeReadOnlyWindowStoreTest { @@ -42,8 +44,8 @@ public class CompositeReadOnlyWindowStoreTest {
@Before
public void before() {
stubProviderOne = new StateStoreProviderStub();
stubProviderTwo = new StateStoreProviderStub();
stubProviderOne = new StateStoreProviderStub(false);
stubProviderTwo = new StateStoreProviderStub(false);
underlyingWindowStore = new ReadOnlyWindowStoreStub<>();
stubProviderOne.addStore(storeName, underlyingWindowStore);
@ -103,6 +105,19 @@ public class CompositeReadOnlyWindowStoreTest { @@ -103,6 +105,19 @@ public class CompositeReadOnlyWindowStoreTest {
assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results);
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws Exception {
final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(true), QueryableStoreTypes.windowStore(), "foo");
store.fetch("key", 1, 10);
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws Exception {
underlyingWindowStore.setOpen(false);
underlyingWindowStore.fetch("key", 1, 10);
}
static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {
final List<KeyValue<K, V>> results = new ArrayList<>();

3
streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.NoOpWindowStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.test.StateStoreProviderStub;
import org.junit.Before;
import org.junit.Test;
@ -33,7 +34,7 @@ public class QueryableStoreProviderTest { @@ -33,7 +34,7 @@ public class QueryableStoreProviderTest {
@Before
public void before() {
final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub();
final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(false);
theStoreProvider.addStore(keyValueStore, new StateStoreTestUtils.NoOpReadOnlyStore<>());
theStoreProvider.addStore(windowStore, new NoOpWindowStore());
storeProvider =

11
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java

@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@ -32,9 +33,13 @@ import java.util.Map; @@ -32,9 +33,13 @@ import java.util.Map;
public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, StateStore {
private final Map<Long, Map<K, V>> data = new HashMap<>();
private boolean open = true;
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
if (!open) {
throw new InvalidStateStoreException("Store is not open");
}
final List<KeyValue<Long, V>> results = new ArrayList<>();
for (long now = timeFrom; now <= timeTo; now++) {
final Map<K, V> kvMap = data.get(now);
@ -79,7 +84,11 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, @@ -79,7 +84,11 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
@Override
public boolean isOpen() {
return false;
return open;
}
public void setOpen(final boolean open) {
this.open = open;
}
private class TheWindowStoreIterator<E> implements WindowStoreIterator<E> {

5
streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java

@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.NoOpWindowStore; @@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.NoOpWindowStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.test.StateStoreProviderStub;
import org.junit.Before;
import org.junit.Test;
@ -37,8 +38,8 @@ public class WrappingStoreProviderTest { @@ -37,8 +38,8 @@ public class WrappingStoreProviderTest {
@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub();
final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub();
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));

13
streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java → streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java

@ -12,10 +12,12 @@ @@ -12,10 +12,12 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
package org.apache.kafka.test;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import java.util.Collections;
import java.util.HashMap;
@ -25,10 +27,19 @@ import java.util.Map; @@ -25,10 +27,19 @@ import java.util.Map;
public class StateStoreProviderStub implements StateStoreProvider {
private final Map<String, StateStore> stores = new HashMap<>();
private final boolean throwException;
public StateStoreProviderStub(final boolean throwException) {
this.throwException = throwException;
}
@SuppressWarnings("unchecked")
@Override
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (throwException) {
throw new InvalidStateStoreException("store is unavailable");
}
if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) {
return (List<T>) Collections.singletonList(stores.get(storeName));
}
Loading…
Cancel
Save