From ec501f305e53a09072580fb3824048c170d32a48 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 5 Dec 2018 22:25:52 +0300 Subject: [PATCH] KAFKA-7420: Global store surrounded by read only implementation (#5865) Reviewers: Matthias J. Sax , Kamal Chandraprakash (@kamalcph), Bill Bejeck --- .../internals/ProcessorContextImpl.java | 181 ++++++++++++ .../internals/ProcessorContextImplTest.java | 260 ++++++++++++++++++ 2 files changed, 441 insertions(+) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 913e34e7d96..c79ec35328a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -19,13 +19,20 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.ApiUtils; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; import java.time.Duration; @@ -65,6 +72,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re /** * @throws StreamsException if an attempt is made to access this state store from an unknown node */ + @SuppressWarnings("unchecked") @Override public StateStore getStateStore(final String name) { if (currentNode() == null) { @@ -73,6 +81,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final StateStore global = stateManager.getGlobalStore(name); if (global != null) { + if (global instanceof KeyValueStore) { + return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global); + } else if (global instanceof WindowStore) { + return new WindowStoreReadOnlyDecorator((WindowStore) global); + } else if (global instanceof SessionStore) { + return new SessionStoreReadOnlyDecorator((SessionStore) global); + } + return global; } @@ -180,4 +196,169 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return streamTimeSupplier.get(); } + private abstract static class StateStoreReadOnlyDecorator implements StateStore { + static final String ERROR_MESSAGE = "Global store is read only"; + + final T underlying; + + StateStoreReadOnlyDecorator(final T underlying) { + this.underlying = underlying; + } + + @Override + public String name() { + return underlying.name(); + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + underlying.init(context, root); + } + + @Override + public void flush() { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void close() { + underlying.close(); + } + + @Override + public boolean persistent() { + return underlying.persistent(); + } + + @Override + public boolean isOpen() { + return underlying.isOpen(); + } + } + + private static class KeyValueStoreReadOnlyDecorator extends StateStoreReadOnlyDecorator> implements KeyValueStore { + KeyValueStoreReadOnlyDecorator(final KeyValueStore underlying) { + super(underlying); + } + + @Override + public V get(final K key) { + return underlying.get(key); + } + + @Override + public KeyValueIterator range(final K from, final K to) { + return underlying.range(from, to); + } + + @Override + public KeyValueIterator all() { + return underlying.all(); + } + + @Override + public long approximateNumEntries() { + return underlying.approximateNumEntries(); + } + + @Override + public void put(final K key, final V value) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public V putIfAbsent(final K key, final V value) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void putAll(final List entries) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public V delete(final K key) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + } + + private static class WindowStoreReadOnlyDecorator extends StateStoreReadOnlyDecorator> implements WindowStore { + WindowStoreReadOnlyDecorator(final WindowStore underlying) { + super(underlying); + } + + @Override + public void put(final K key, final V value) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void put(final K key, final V value, final long windowStartTimestamp) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public V fetch(final K key, final long time) { + return underlying.fetch(key, time); + } + + @Deprecated + @Override + public WindowStoreIterator fetch(final K key, final long timeFrom, final long timeTo) { + return underlying.fetch(key, timeFrom, timeTo); + } + + @Deprecated + @Override + public KeyValueIterator, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { + return underlying.fetch(from, to, timeFrom, timeTo); + } + + @Override + public KeyValueIterator, V> all() { + return underlying.all(); + } + + @Deprecated + @Override + public KeyValueIterator, V> fetchAll(final long timeFrom, final long timeTo) { + return underlying.fetchAll(timeFrom, timeTo); + } + } + + private static class SessionStoreReadOnlyDecorator extends StateStoreReadOnlyDecorator> implements SessionStore { + SessionStoreReadOnlyDecorator(final SessionStore underlying) { + super(underlying); + } + + @Override + public KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return underlying.findSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + return underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public void remove(final Windowed sessionKey) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void put(final Windowed sessionKey, final AGG aggregate) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public KeyValueIterator, AGG> fetch(final K key) { + return underlying.fetch(key); + } + + @Override + public KeyValueIterator, AGG> fetch(final K from, final K to) { + return underlying.fetch(from, to); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java new file mode 100644 index 00000000000..fa5f597aa89 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.junit.Before; +import org.junit.Test; + +import static java.util.Collections.emptySet; +import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ProcessorContextImplTest { + private ProcessorContextImpl context; + + private static final String KEY = "key"; + private static final long VAL = 42L; + private static final String STORE_NAME = "underlying-store"; + + private boolean initExecuted; + private boolean closeExecuted; + private KeyValueIterator rangeIter; + private KeyValueIterator allIter; + + private List, Long>> iters = new ArrayList<>(7); + private WindowStoreIterator windowStoreIter; + + @Before + public void setup() { + rangeIter = mock(KeyValueIterator.class); + allIter = mock(KeyValueIterator.class); + windowStoreIter = mock(WindowStoreIterator.class); + + for (int i = 0; i < 7; i++) { + iters.add(i, mock(KeyValueIterator.class)); + } + + final StreamsConfig streamsConfig = mock(StreamsConfig.class); + expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id"); + expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray()); + expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray()); + replay(streamsConfig); + + final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); + + expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock()); + expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock()); + + replay(stateManager); + + context = new ProcessorContextImpl( + mock(TaskId.class), + mock(StreamTask.class), + streamsConfig, + mock(RecordCollector.class), + stateManager, + mock(StreamsMetricsImpl.class), + mock(ThreadCache.class) + ); + + context.setCurrentNode(new ProcessorNode("fake", null, emptySet())); + } + + @Test + public void testKeyValueStore() { + doTest("KeyValueStore", (Consumer>) store -> { + checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put"); + checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent"); + checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll"); + checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete"); + + assertEquals((Long) VAL, store.get(KEY)); + assertEquals(rangeIter, store.range("one", "two")); + assertEquals(allIter, store.all()); + assertEquals(VAL, store.approximateNumEntries()); + }); + } + + @Test + public void testWindowStore() { + doTest("WindowStore", (Consumer>) store -> { + checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put"); + checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put"); + + assertEquals(iters.get(0), store.fetchAll(0L, 0L)); + assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); + assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L)); + assertEquals((Long) VAL, store.fetch(KEY, 1L)); + assertEquals(iters.get(2), store.all()); + }); + } + + @Test + public void testSessionStore() { + doTest("SessionStore", (Consumer>) store -> { + checkThrowsUnsupportedOperation(() -> store.remove(null), "remove"); + checkThrowsUnsupportedOperation(() -> store.put(null, null), "put"); + + assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L)); + assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L)); + assertEquals(iters.get(5), store.fetch(KEY)); + assertEquals(iters.get(6), store.fetch(KEY, KEY)); + }); + } + + private KeyValueStore keyValueStoreMock() { + final KeyValueStore keyValueStoreMock = mock(KeyValueStore.class); + + initStateStoreMock(keyValueStoreMock); + + expect(keyValueStoreMock.get(KEY)).andReturn(VAL); + expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL); + + expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter); + expect(keyValueStoreMock.all()).andReturn(allIter); + + replay(keyValueStoreMock); + + return keyValueStoreMock; + } + + private WindowStore windowStoreMock() { + final WindowStore windowStore = mock(WindowStore.class); + + initStateStoreMock(windowStore); + + expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(iters.get(0)); + expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(1)); + expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter); + expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL); + expect(windowStore.all()).andReturn(iters.get(2)); + + replay(windowStore); + + return windowStore; + } + + private SessionStore sessionStoreMock() { + final SessionStore sessionStore = mock(SessionStore.class); + + initStateStoreMock(sessionStore); + + expect(sessionStore.findSessions(anyString(), anyLong(), anyLong())).andReturn(iters.get(3)); + expect(sessionStore.findSessions(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(4)); + expect(sessionStore.fetch(anyString())).andReturn(iters.get(5)); + expect(sessionStore.fetch(anyString(), anyString())).andReturn(iters.get(6)); + + replay(sessionStore); + + return sessionStore; + } + + private void initStateStoreMock(final StateStore windowStore) { + expect(windowStore.name()).andReturn(STORE_NAME); + expect(windowStore.persistent()).andReturn(true); + expect(windowStore.isOpen()).andReturn(true); + + windowStore.init(null, null); + expectLastCall().andAnswer(() -> { + initExecuted = true; + return null; + }); + + windowStore.close(); + expectLastCall().andAnswer(() -> { + closeExecuted = true; + return null; + }); + } + + private void doTest(final String name, final Consumer checker) { + final Processor processor = new Processor() { + @Override + @SuppressWarnings("unchecked") + public void init(final ProcessorContext context) { + final T store = (T) context.getStateStore(name); + + checkStateStoreMethods(store); + + checker.accept(store); + + } + + @Override + public void process(final String k, final Long v) { + //No-op. + } + + @Override + public void close() { + //No-op. + } + }; + + processor.init(context); + } + + private void checkStateStoreMethods(final StateStore store) { + checkThrowsUnsupportedOperation(store::flush, "flush"); + + assertEquals(STORE_NAME, store.name()); + assertTrue(store.persistent()); + assertTrue(store.isOpen()); + + store.init(null, null); + assertTrue(initExecuted); + + store.close(); + assertTrue(closeExecuted); + } + + private void checkThrowsUnsupportedOperation(final Runnable check, final String name) { + try { + check.run(); + fail(name + " should throw exception"); + } catch (final UnsupportedOperationException e) { + //ignore. + } + } +}