Browse Source

KAFKA-7420: Global store surrounded by read only implementation (#5865)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Kamal Chandraprakash (@kamalcph), Bill Bejeck <bill@confluent.io>
pull/5775/head
Nikolay 6 years ago committed by Matthias J. Sax
parent
commit
ec501f305e
  1. 181
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
  2. 260
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java

181
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

@ -19,13 +19,20 @@ package org.apache.kafka.streams.processor.internals; @@ -19,13 +19,20 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
@ -65,6 +72,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -65,6 +72,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
/**
* @throws StreamsException if an attempt is made to access this state store from an unknown node
*/
@SuppressWarnings("unchecked")
@Override
public StateStore getStateStore(final String name) {
if (currentNode() == null) {
@ -73,6 +81,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -73,6 +81,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final StateStore global = stateManager.getGlobalStore(name);
if (global != null) {
if (global instanceof KeyValueStore) {
return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global);
} else if (global instanceof WindowStore) {
return new WindowStoreReadOnlyDecorator((WindowStore) global);
} else if (global instanceof SessionStore) {
return new SessionStoreReadOnlyDecorator((SessionStore) global);
}
return global;
}
@ -180,4 +196,169 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -180,4 +196,169 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return streamTimeSupplier.get();
}
private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> implements StateStore {
static final String ERROR_MESSAGE = "Global store is read only";
final T underlying;
StateStoreReadOnlyDecorator(final T underlying) {
this.underlying = underlying;
}
@Override
public String name() {
return underlying.name();
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
underlying.init(context, root);
}
@Override
public void flush() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void close() {
underlying.close();
}
@Override
public boolean persistent() {
return underlying.persistent();
}
@Override
public boolean isOpen() {
return underlying.isOpen();
}
}
private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> underlying) {
super(underlying);
}
@Override
public V get(final K key) {
return underlying.get(key);
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
return underlying.range(from, to);
}
@Override
public KeyValueIterator<K, V> all() {
return underlying.all();
}
@Override
public long approximateNumEntries() {
return underlying.approximateNumEntries();
}
@Override
public void put(final K key, final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public V putIfAbsent(final K key, final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void putAll(final List entries) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public V delete(final K key) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
}
private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
WindowStoreReadOnlyDecorator(final WindowStore<K, V> underlying) {
super(underlying);
}
@Override
public void put(final K key, final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void put(final K key, final V value, final long windowStartTimestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public V fetch(final K key, final long time) {
return underlying.fetch(key, time);
}
@Deprecated
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
return underlying.fetch(key, timeFrom, timeTo);
}
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
return underlying.fetch(from, to, timeFrom, timeTo);
}
@Override
public KeyValueIterator<Windowed<K>, V> all() {
return underlying.all();
}
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
return underlying.fetchAll(timeFrom, timeTo);
}
}
private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> underlying) {
super(underlying);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return underlying.findSessions(key, earliestSessionEndTime, latestSessionStartTime);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
return underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}
@Override
public void remove(final Windowed sessionKey) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void put(final Windowed<K> sessionKey, final AGG aggregate) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return underlying.fetch(key);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
return underlying.fetch(from, to);
}
}
}

260
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java

@ -0,0 +1,260 @@ @@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.Before;
import org.junit.Test;
import static java.util.Collections.emptySet;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ProcessorContextImplTest {
private ProcessorContextImpl context;
private static final String KEY = "key";
private static final long VAL = 42L;
private static final String STORE_NAME = "underlying-store";
private boolean initExecuted;
private boolean closeExecuted;
private KeyValueIterator<String, Long> rangeIter;
private KeyValueIterator<String, Long> allIter;
private List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7);
private WindowStoreIterator<Long> windowStoreIter;
@Before
public void setup() {
rangeIter = mock(KeyValueIterator.class);
allIter = mock(KeyValueIterator.class);
windowStoreIter = mock(WindowStoreIterator.class);
for (int i = 0; i < 7; i++) {
iters.add(i, mock(KeyValueIterator.class));
}
final StreamsConfig streamsConfig = mock(StreamsConfig.class);
expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
replay(streamsConfig);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock());
expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock());
expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock());
replay(stateManager);
context = new ProcessorContextImpl(
mock(TaskId.class),
mock(StreamTask.class),
streamsConfig,
mock(RecordCollector.class),
stateManager,
mock(StreamsMetricsImpl.class),
mock(ThreadCache.class)
);
context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, emptySet()));
}
@Test
public void testKeyValueStore() {
doTest("KeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent");
checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll");
checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete");
assertEquals((Long) VAL, store.get(KEY));
assertEquals(rangeIter, store.range("one", "two"));
assertEquals(allIter, store.all());
assertEquals(VAL, store.approximateNumEntries());
});
}
@Test
public void testWindowStore() {
doTest("WindowStore", (Consumer<WindowStore<String, Long>>) store -> {
checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put");
checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
assertEquals(iters.get(0), store.fetchAll(0L, 0L));
assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
assertEquals((Long) VAL, store.fetch(KEY, 1L));
assertEquals(iters.get(2), store.all());
});
}
@Test
public void testSessionStore() {
doTest("SessionStore", (Consumer<SessionStore<String, Long>>) store -> {
checkThrowsUnsupportedOperation(() -> store.remove(null), "remove");
checkThrowsUnsupportedOperation(() -> store.put(null, null), "put");
assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
assertEquals(iters.get(5), store.fetch(KEY));
assertEquals(iters.get(6), store.fetch(KEY, KEY));
});
}
private KeyValueStore<String, Long> keyValueStoreMock() {
final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class);
initStateStoreMock(keyValueStoreMock);
expect(keyValueStoreMock.get(KEY)).andReturn(VAL);
expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL);
expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
expect(keyValueStoreMock.all()).andReturn(allIter);
replay(keyValueStoreMock);
return keyValueStoreMock;
}
private WindowStore<String, Long> windowStoreMock() {
final WindowStore<String, Long> windowStore = mock(WindowStore.class);
initStateStoreMock(windowStore);
expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(iters.get(0));
expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(1));
expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter);
expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
expect(windowStore.all()).andReturn(iters.get(2));
replay(windowStore);
return windowStore;
}
private SessionStore<String, Long> sessionStoreMock() {
final SessionStore<String, Long> sessionStore = mock(SessionStore.class);
initStateStoreMock(sessionStore);
expect(sessionStore.findSessions(anyString(), anyLong(), anyLong())).andReturn(iters.get(3));
expect(sessionStore.findSessions(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(4));
expect(sessionStore.fetch(anyString())).andReturn(iters.get(5));
expect(sessionStore.fetch(anyString(), anyString())).andReturn(iters.get(6));
replay(sessionStore);
return sessionStore;
}
private void initStateStoreMock(final StateStore windowStore) {
expect(windowStore.name()).andReturn(STORE_NAME);
expect(windowStore.persistent()).andReturn(true);
expect(windowStore.isOpen()).andReturn(true);
windowStore.init(null, null);
expectLastCall().andAnswer(() -> {
initExecuted = true;
return null;
});
windowStore.close();
expectLastCall().andAnswer(() -> {
closeExecuted = true;
return null;
});
}
private <T extends StateStore> void doTest(final String name, final Consumer<T> checker) {
final Processor processor = new Processor<String, Long>() {
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
final T store = (T) context.getStateStore(name);
checkStateStoreMethods(store);
checker.accept(store);
}
@Override
public void process(final String k, final Long v) {
//No-op.
}
@Override
public void close() {
//No-op.
}
};
processor.init(context);
}
private void checkStateStoreMethods(final StateStore store) {
checkThrowsUnsupportedOperation(store::flush, "flush");
assertEquals(STORE_NAME, store.name());
assertTrue(store.persistent());
assertTrue(store.isOpen());
store.init(null, null);
assertTrue(initExecuted);
store.close();
assertTrue(closeExecuted);
}
private void checkThrowsUnsupportedOperation(final Runnable check, final String name) {
try {
check.run();
fail(name + " should throw exception");
} catch (final UnsupportedOperationException e) {
//ignore.
}
}
}
Loading…
Cancel
Save