From 813f92c21ad4bd7ffdd8acc66c20d31927e7a67f Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 9 Jul 2020 20:50:31 +0200 Subject: [PATCH] KAFKA-10179: Pass correct changelog topic to state serdes (#8902) Until now we always passed the default changelog topic name to the state serdes. However, for optimized source tables and global tables the changelog topic is the source topic. Most serdes do not use the topic name passed to them. However, if the serdes actually use the topic name for (de)serialization a org.apache.kafka.common.errors.SerializationException is thrown. This commits passed the correct changelog topic to the state serdes of the metered state stores. Reviewers: A. Sophie Blee-Goldman , Matthias J. Sax , John Roesler --- .../internals/AbstractProcessorContext.java | 16 +- .../internals/GlobalProcessorContextImpl.java | 12 +- .../internals/GlobalStateManagerImpl.java | 23 ++- .../internals/InternalProcessorContext.java | 1 + .../internals/ProcessorContextImpl.java | 29 +-- .../internals/ProcessorContextUtils.java | 6 + .../internals/ProcessorStateManager.java | 30 ++- .../internals/ProcessorTopology.java | 4 +- .../processor/internals/StateManager.java | 4 +- .../state/internals/MeteredKeyValueStore.java | 7 +- .../state/internals/MeteredSessionStore.java | 20 +- .../MeteredTimestampedKeyValueStore.java | 7 +- .../MeteredTimestampedWindowStore.java | 10 +- .../state/internals/MeteredWindowStore.java | 7 +- .../internals/graph/TableSourceNodeTest.java | 76 ++++++++ .../AbstractProcessorContextTest.java | 12 +- .../GlobalProcessorContextImplTest.java | 2 +- .../InternalTopologyBuilderTest.java | 20 ++ .../internals/ProcessorContextImplTest.java | 24 ++- .../internals/ProcessorStateManagerTest.java | 49 +++++ .../processor/internals/StateManagerStub.java | 4 + .../GlobalStateStoreProviderTest.java | 13 +- .../internals/MeteredKeyValueStoreTest.java | 102 +++++++--- .../internals/MeteredSessionStoreTest.java | 181 ++++++++++++------ .../MeteredTimestampedKeyValueStoreTest.java | 128 +++++++++---- .../MeteredTimestampedWindowStoreTest.java | 76 +++++++- .../internals/MeteredWindowStoreTest.java | 64 ++++++- .../kafka/test/GlobalStateManagerStub.java | 5 + .../test/InternalMockProcessorContext.java | 18 +- .../test/MockInternalProcessorContext.java | 5 + .../kafka/test/NoOpProcessorContext.java | 16 +- 31 files changed, 767 insertions(+), 204 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index cfa91f5b232..3e74ef026d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -45,24 +45,23 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte protected ProcessorNode currentNode; private long currentSystemTimeMs; - final StateManager stateManager; protected ThreadCache cache; public AbstractProcessorContext(final TaskId taskId, final StreamsConfig config, final StreamsMetricsImpl metrics, - final StateManager stateManager, final ThreadCache cache) { this.taskId = taskId; this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); this.config = config; this.metrics = metrics; - this.stateManager = stateManager; valueSerde = config.defaultValueSerde(); keySerde = config.defaultKeySerde(); this.cache = cache; } + protected abstract StateManager stateManager(); + @Override public void setSystemTimeMs(final long timeMs) { currentSystemTimeMs = timeMs; @@ -95,7 +94,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @Override public File stateDir() { - return stateManager.baseDir(); + return stateManager().baseDir(); } @Override @@ -110,7 +109,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte throw new IllegalStateException("Can only create state stores during initialization."); } Objects.requireNonNull(store, "store must not be null"); - stateManager.registerStore(store, stateRestoreCallback); + stateManager().registerStore(store, stateRestoreCallback); } /** @@ -223,6 +222,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @Override public TaskType taskType() { - return stateManager.taskType(); + return stateManager().taskType(); + } + + @Override + public String changelogFor(final String storeName) { + return stateManager().changelogFor(storeName); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 480e4a065ff..57989df9cb1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -34,11 +34,19 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe public class GlobalProcessorContextImpl extends AbstractProcessorContext { + private final GlobalStateManager stateManager; + public GlobalProcessorContextImpl(final StreamsConfig config, - final StateManager stateMgr, + final GlobalStateManager stateMgr, final StreamsMetricsImpl metrics, final ThreadCache cache) { - super(new TaskId(-1, -1), config, metrics, stateMgr, cache); + super(new TaskId(-1, -1), config, metrics, cache); + stateManager = stateMgr; + } + + @Override + protected StateManager stateManager() { + return stateManager; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 1def55c82a6..0779dcf5b11 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -59,7 +59,6 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.conv */ public class GlobalStateManagerImpl implements GlobalStateManager { private final Logger log; - private final ProcessorTopology topology; private final Consumer globalConsumer; private final File baseDir; private final StateDirectory stateDirectory; @@ -73,6 +72,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { private final Set globalNonPersistentStoresTopics = new HashSet<>(); private final OffsetCheckpoint checkpointFile; private final Map checkpointFileCache; + private final Map storeToChangelogTopic; + private final List globalStateStores; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -80,20 +81,20 @@ public class GlobalStateManagerImpl implements GlobalStateManager { final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { + storeToChangelogTopic = topology.storeToChangelogTopic(); + globalStateStores = topology.globalStateStores(); baseDir = stateDirectory.globalStateDir(); checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); checkpointFileCache = new HashMap<>(); // Find non persistent store's topics - final Map storeToChangelogTopic = topology.storeToChangelogTopic(); - for (final StateStore store : topology.globalStateStores()) { + for (final StateStore store : globalStateStores) { if (!store.persistent()) { - globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + globalNonPersistentStoresTopics.add(changelogFor(store.name())); } } log = logContext.logger(GlobalStateManagerImpl.class); - this.topology = topology; this.globalConsumer = globalConsumer; this.stateDirectory = stateDirectory; this.stateRestoreListener = stateRestoreListener; @@ -128,12 +129,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager { throw new StreamsException("Failed to read checkpoints for global state globalStores", e); } - final List stateStores = topology.globalStateStores(); - final Map storeNameToChangelog = topology.storeToChangelogTopic(); final Set changelogTopics = new HashSet<>(); - for (final StateStore stateStore : stateStores) { + for (final StateStore stateStore : globalStateStores) { globalStoreNames.add(stateStore.name()); - final String sourceTopic = storeNameToChangelog.get(stateStore.name()); + final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); stateStore.init(globalProcessorContext, stateStore); } @@ -226,7 +225,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } private List topicPartitionsForStore(final StateStore store) { - final String sourceTopic = topology.storeToChangelogTopic().get(store.name()); + final String sourceTopic = storeToChangelogTopic.get(store.name()); List partitionInfos; int attempts = 0; while (true) { @@ -402,4 +401,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { public Map changelogOffsets() { return Collections.unmodifiableMap(checkpointFileCache); } + + public String changelogFor(final String storeName) { + return storeToChangelogTopic.get(storeName); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 572f18fb804..8d2b3246eb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -119,4 +119,5 @@ public interface InternalProcessorContext extends ProcessorContext { final byte[] value, final long timestamp); + String changelogFor(final String storeName); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index b220fa56a58..6baef83fe2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals; import java.util.HashMap; import java.util.Map; + +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; @@ -25,7 +27,6 @@ import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; @@ -49,7 +50,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); - final Map storeToChangelogTopic = new HashMap<>(); + private final ProcessorStateManager stateManager; + final Map cacheNameToFlushListener = new HashMap<>(); public ProcessorContextImpl(final TaskId id, @@ -57,7 +59,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final ProcessorStateManager stateMgr, final StreamsMetricsImpl metrics, final ThreadCache cache) { - super(id, config, metrics, stateMgr, cache); + super(id, config, metrics, cache); + stateManager = stateMgr; } @Override @@ -96,15 +99,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } - public ProcessorStateManager stateManager() { - return (ProcessorStateManager) stateManager; - } - @Override - public void register(final StateStore store, - final StateRestoreCallback stateRestoreCallback) { - storeToChangelogTopic.put(store.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), store.name())); - super.register(store, stateRestoreCallback); + public ProcessorStateManager stateManager() { + return stateManager; } @Override @@ -118,16 +115,20 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final byte[] value, final long timestamp) { throwUnsupportedOperationExceptionIfStandby("logChange"); + + final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName); + // Sending null headers to changelog topics (KIP-244) collector.send( - storeToChangelogTopic.get(storeName), + changelogPartition.topic(), key, value, null, - taskId().partition, + changelogPartition.partition(), timestamp, BYTES_KEY_SERIALIZER, - BYTEARRAY_VALUE_SERIALIZER); + BYTEARRAY_VALUE_SERIALIZER + ); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index 8f33b67e4d7..78b1ff5ebac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -46,4 +46,10 @@ public final class ProcessorContextUtils { public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) { return (StreamsMetricsImpl) context.metrics(); } + + public static String changelogFor(final ProcessorContext context, final String storeName) { + return context instanceof InternalProcessorContext + ? ((InternalProcessorContext) context).changelogFor(storeName) + : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 953a4e81c0a..a5001a248fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -147,8 +147,8 @@ public class ProcessorStateManager implements StateManager { private final TaskId taskId; private final boolean eosEnabled; private final ChangelogRegister changelogReader; - private final Map storeToChangelogTopic; private final Collection sourcePartitions; + private final Map storeToChangelogTopic; // must be maintained in topological order private final FixedOrderMap stores = new FixedOrderMap<>(); @@ -174,7 +174,7 @@ public class ProcessorStateManager implements StateManager { final ChangelogRegister changelogReader, final Map storeToChangelogTopic, final Collection sourcePartitions) throws ProcessorStateException { - + this.storeToChangelogTopic = storeToChangelogTopic; this.log = logContext.logger(ProcessorStateManager.class); this.logPrefix = logContext.logPrefix(); this.taskId = taskId; @@ -182,7 +182,6 @@ public class ProcessorStateManager implements StateManager { this.eosEnabled = eosEnabled; this.changelogReader = changelogReader; this.sourcePartitions = sourcePartitions; - this.storeToChangelogTopic = storeToChangelogTopic; this.baseDir = stateDirectory.directoryForTask(taskId); this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId)); @@ -559,13 +558,13 @@ public class ProcessorStateManager implements StateManager { // NOTE we assume the partition of the topic can always be inferred from the task id; // if user ever use a custom partition grouper (deprecated in KIP-528) this would break and // it is not a regression (it would always break anyways) - return new TopicPartition(storeToChangelogTopic.get(storeName), taskId.partition); + return new TopicPartition(changelogFor(storeName), taskId.partition); } private boolean isLoggingEnabled(final String storeName) { // if the store name does not exist in the changelog map, it means the underlying store // is not log enabled (including global stores) - return storeToChangelogTopic.containsKey(storeName); + return changelogFor(storeName) != null; } private StateStoreMetadata findStore(final TopicPartition changelogPartition) { @@ -590,4 +589,25 @@ public class ProcessorStateManager implements StateManager { private Long changelogOffsetFromCheckpointedOffset(final long offset) { return offset != OFFSET_UNKNOWN ? offset : null; } + + public TopicPartition registeredChangelogPartitionFor(final String storeName) { + final StateStoreMetadata storeMetadata = stores.get(storeName); + if (storeMetadata == null) { + throw new IllegalStateException("State store " + storeName + + " for which the registered changelog partition should be" + + " retrieved has not been registered" + ); + } + if (storeMetadata.changelogPartition == null) { + throw new IllegalStateException("Registered state store " + storeName + + " does not have a registered changelog partition." + + " This may happen if logging is disabled for the state store." + ); + } + return storeMetadata.changelogPartition; + } + + public String changelogFor(final String storeName) { + return storeToChangelogTopic.get(storeName); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 51b61b5e795..64d2adcffc5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -104,11 +104,11 @@ public class ProcessorTopology { } public List globalStateStores() { - return globalStateStores; + return Collections.unmodifiableList(globalStateStores); } public Map storeToChangelogTopic() { - return storeToChangelogTopic; + return Collections.unmodifiableMap(storeToChangelogTopic); } boolean isRepartitionTopic(final String topic) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 674ea18be46..6990a9834ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.Map; import org.apache.kafka.streams.processor.internals.Task.TaskType; -interface StateManager { +public interface StateManager { File baseDir(); /** @@ -48,6 +48,8 @@ interface StateManager { TaskType taskType(); + String changelogFor(final String storeName); + // TODO: we can remove this when consolidating global state manager into processor state manager StateStore getGlobalStore(final String name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index c844e034240..d77834b66e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; @@ -103,8 +104,12 @@ public class MeteredKeyValueStore @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? (Serde) context.valueSerde() : valueSerde); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index c7d4290ab1d..976ca5ecc0f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; @@ -65,14 +66,10 @@ public class MeteredSessionStore this.time = time; } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { - serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + initStoreSerde(context); taskId = context.taskId().toString(); streamsMetrics = (StreamsMetricsImpl) context.metrics(); @@ -87,6 +84,19 @@ public class MeteredSessionStore maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); } + @SuppressWarnings("unchecked") + private void initStoreSerde(final ProcessorContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); + serdes = new StateSerdes<>( + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde + ); + } + @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener, V> listener, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index d1446dce758..44be936d92b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -51,8 +52,12 @@ public class MeteredTimestampedKeyValueStore @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java index aecf69f495a..3c386f3f7db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -50,9 +51,14 @@ class MeteredTimestampedWindowStore @SuppressWarnings("unchecked") @Override void initStoreSerde(final ProcessorContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); + valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde + ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index fd394681dc4..b253f39a4f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; @@ -87,8 +88,12 @@ public class MeteredWindowStore @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? (Serde) context.valueSerde() : valueSerde); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java new file mode 100644 index 00000000000..66c55c0f341 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.internals.ConsumedInternal; +import org.apache.kafka.streams.kstream.internals.KTableSource; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({InternalTopologyBuilder.class}) +public class TableSourceNodeTest { + + private static final String STORE_NAME = "store-name"; + private static final String TOPIC = "input-topic"; + + private final InternalTopologyBuilder topologyBuilder = PowerMock.createNiceMock(InternalTopologyBuilder.class); + + @Test + public void shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() { + final boolean shouldReuseSourceTopicForChangelog = true; + topologyBuilder.connectSourceStoreAndTopic(STORE_NAME, TOPIC); + EasyMock.replay(topologyBuilder); + + buildTableSourceNode(shouldReuseSourceTopicForChangelog); + + EasyMock.verify(topologyBuilder); + } + + @Test + public void shouldConnectStateStoreToChangelogTopic() { + final boolean shouldReuseSourceTopicForChangelog = false; + EasyMock.replay(topologyBuilder); + + buildTableSourceNode(shouldReuseSourceTopicForChangelog); + + EasyMock.verify(topologyBuilder); + } + + private void buildTableSourceNode(final boolean shouldReuseSourceTopicForChangelog) { + final TableSourceNodeBuilder tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder(); + final TableSourceNode tableSourceNode = tableSourceNodeBuilder + .withTopic(TOPIC) + .withMaterializedInternal(new MaterializedInternal<>(Materialized.as(STORE_NAME))) + .withConsumedInternal(new ConsumedInternal<>(Consumed.as("node-name"))) + .withProcessorParameters( + new ProcessorParameters<>(new KTableSource<>(STORE_NAME, STORE_NAME), null)) + .build(); + tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog); + + tableSourceNode.writeToTopology(topologyBuilder); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 26caa46ad1c..5d83347c972 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -196,7 +196,12 @@ public class AbstractProcessorContextTest { } TestProcessorContext(final MockStreamsMetrics metrics) { - super(new TaskId(0, 0), new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache(new LogContext("name "), 0, metrics)); + super(new TaskId(0, 0), new StreamsConfig(config), metrics, new ThreadCache(new LogContext("name "), 0, metrics)); + } + + @Override + protected StateManager stateManager() { + return new StateManagerStub(); } @Override @@ -254,5 +259,10 @@ public class AbstractProcessorContextTest { @Override public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) { } + + @Override + public String changelogFor(final String storeName) { + return ProcessorStateManager.storeChangelogTopic(applicationId(), storeName); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index e4fe6ed9780..7119d5bf585 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -65,7 +65,7 @@ public class GlobalProcessorContextImplTest { expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray()); replay(streamsConfig); - final StateManager stateManager = mock(StateManager.class); + final GlobalStateManager stateManager = mock(GlobalStateManager.class); expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class)); expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class)); expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 1e87a0e7432..9bcd47e4a1e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -49,6 +49,7 @@ import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -1089,4 +1090,23 @@ public class InternalTopologyBuilderTest { new RepartitionTopicConfig("Y-topic-1y", Collections.emptyMap()) ); } + + @Test + public void shouldConnectGlobalStateStoreToInputTopic() { + final String globalStoreName = "global-store"; + final String globalTopic = "global-topic"; + builder.setApplicationId("X"); + builder.addGlobalStore( + new MockKeyValueStoreBuilder(globalStoreName, false).withLoggingDisabled(), + "globalSource", + null, + null, + null, + globalTopic, + "global-processor", + new MockProcessorSupplier<>()); + builder.initializeSubscription(); + + assertThat(builder.buildGlobalStateTopology().storeToChangelogTopic().get(globalStoreName), is(globalTopic)); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 58f4eb45d15..d3b05bcbf0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor.internals; import java.time.Duration; + +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; @@ -72,9 +74,14 @@ public class ProcessorContextImplTest { private RecordCollector recordCollector = mock(RecordCollector.class); private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); private static final long VALUE = 42L; + private static final byte[] VALUE_BYTES = String.valueOf(VALUE).getBytes(); + private static final long TIMESTAMP = 21L; private static final ValueAndTimestamp VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L); private static final String STORE_NAME = "underlying-store"; + private static final String REGISTERED_STORE_NAME = "registered-store"; + private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1); private boolean flushExecuted; private boolean putExecuted; @@ -127,6 +134,7 @@ public class ProcessorContextImplTest { expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock()); expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock()); expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock()); + expect(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).andStubReturn(CHANGELOG_PARTITION); replay(stateManager); @@ -373,16 +381,22 @@ public class ProcessorContextImplTest { @Test public void shouldNotSendRecordHeadersToChangelogTopic() { - final Bytes key = Bytes.wrap("key".getBytes()); - final byte[] value = "zero".getBytes(); - - recordCollector.send(null, key, value, null, 0, 42L, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER); + recordCollector.send( + CHANGELOG_PARTITION.topic(), + KEY_BYTES, + VALUE_BYTES, + null, + CHANGELOG_PARTITION.partition(), + TIMESTAMP, + BYTES_KEY_SERIALIZER, + BYTEARRAY_VALUE_SERIALIZER + ); final StreamTask task = EasyMock.createNiceMock(StreamTask.class); replay(recordCollector, task); context.transitionToActive(task, recordCollector, null); - context.logChange("Store", key, value, 42L); + context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, TIMESTAMP); verify(recordCollector); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 77ed3a42c3d..f6368d716dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -148,6 +148,17 @@ public class ProcessorStateManagerTest { Utils.delete(baseDir); } + @Test + public void shouldReturnDefaultChangelogTopicName() { + final String applicationId = "appId"; + final String storeName = "store"; + + assertThat( + ProcessorStateManager.storeChangelogTopic(applicationId, storeName), + is(applicationId + "-" + storeName + "-changelog") + ); + } + @Test public void shouldReturnBaseDir() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); @@ -438,6 +449,44 @@ public class ProcessorStateManagerTest { } } + @Test + public void shouldGetChangelogPartitionForRegisteredStore() { + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); + stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); + + final TopicPartition changelogPartition = stateMgr.registeredChangelogPartitionFor(persistentStoreName); + + assertThat(changelogPartition.topic(), is(persistentStoreTopicName)); + assertThat(changelogPartition.partition(), is(taskId.partition)); + } + + @Test + public void shouldThrowIfStateStoreIsNotRegistered() { + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); + + assertThrows("State store " + persistentStoreName + + " for which the registered changelog partition should be" + + " retrieved has not been registered", + IllegalStateException.class, + () -> stateMgr.registeredChangelogPartitionFor(persistentStoreName) + ); + } + + @Test + public void shouldThrowIfStateStoreHasLoggingDisabled() { + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); + final String storeName = "store-with-logging-disabled"; + final MockKeyValueStore storeWithLoggingDisabled = new MockKeyValueStore(storeName, true); + stateMgr.registerStore(storeWithLoggingDisabled, null); + + assertThrows("Registered state store " + storeName + + " does not have a registered changelog partition." + + " This may happen if logging is disabled for the state store.", + IllegalStateException.class, + () -> stateMgr.registeredChangelogPartitionFor(storeName) + ); + } + @Test public void shouldFlushCheckpointAndClose() throws IOException { checkpoint.write(emptyMap()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java index cf1eafd8495..907843d7202 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java @@ -65,4 +65,8 @@ public class StateManagerStub implements StateManager { return null; } + @Override + public String changelogFor(final String storeName) { + return null; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 1ab8684e905..2dfdc6fef54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -42,7 +42,7 @@ import java.util.List; import java.util.Map; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; @@ -89,13 +89,12 @@ public class GlobalStateStoreProviderTest { Serdes.String(), Serdes.String()).build()); - final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class); - expect(mockContext.applicationId()).andReturn("appId").anyTimes(); + final ProcessorContextImpl mockContext = niceMock(ProcessorContextImpl.class); + expect(mockContext.applicationId()).andStubReturn("appId"); expect(mockContext.metrics()) - .andReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST)) - .anyTimes(); - expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes(); - expect(mockContext.recordCollector()).andReturn(null).anyTimes(); + .andStubReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST)); + expect(mockContext.taskId()).andStubReturn(new TaskId(0, 0)); + expect(mockContext.recordCollector()).andStubReturn(null); replay(mockContext); for (final StateStore store : stores.values()) { store.init(mockContext, null); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 4d666b21c25..17c882cdee2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -23,13 +23,17 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -61,6 +65,7 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; @@ -78,11 +83,19 @@ public class MeteredKeyValueStoreTest { @Rule public EasyMockRule rule = new EasyMockRule(this); + private static final String APPLICATION_ID = "test-app"; + private static final String STORE_NAME = "store-name"; private static final String STORE_TYPE = "scope"; private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-" + STORE_TYPE + "-state-metrics"; private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; + private static final String CHANGELOG_TOPIC = "changelog-topic"; private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id"; private static final String THREAD_ID_TAG_KEY = "thread-id"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final String VALUE = "value"; + private static final byte[] VALUE_BYTES = VALUE.getBytes(); + private static final KeyValue BYTE_KEY_VALUE_PAIR = KeyValue.pair(KEY_BYTES, VALUE_BYTES); private final String threadId = Thread.currentThread().getName(); private final TaskId taskId = new TaskId(0, 0); @@ -93,11 +106,6 @@ public class MeteredKeyValueStoreTest { private InternalProcessorContext context; private MeteredKeyValueStore metered; - private final String key = "key"; - private final Bytes keyBytes = Bytes.wrap(key.getBytes()); - private final String value = "value"; - private final byte[] valueBytes = value.getBytes(); - private final KeyValue byteKeyValuePair = KeyValue.pair(keyBytes, valueBytes); private final Metrics metrics = new Metrics(); private String storeLevelGroup; private String threadIdTagKey; @@ -124,10 +132,11 @@ public class MeteredKeyValueStoreTest { Serdes.String() ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); - expect(context.metrics()) - .andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes(); - expect(context.taskId()).andReturn(taskId).anyTimes(); - expect(inner.name()).andReturn("metered").anyTimes(); + expect(context.applicationId()).andStubReturn(APPLICATION_ID); + expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)); + expect(context.taskId()).andStubReturn(taskId); + expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); + expect(inner.name()).andStubReturn(STORE_NAME); storeLevelGroup = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP; threadIdTagKey = @@ -135,7 +144,7 @@ public class MeteredKeyValueStoreTest { tags = mkMap( mkEntry(threadIdTagKey, threadId), mkEntry("task-id", taskId.toString()), - mkEntry(STORE_TYPE + "-state-id", "metered") + mkEntry(STORE_TYPE + "-state-id", STORE_NAME) ); } @@ -144,6 +153,47 @@ public class MeteredKeyValueStoreTest { metered.init(context, metered); } + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME); + expect(context.changelogFor(STORE_NAME)).andReturn(null); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde keySerde = niceMock(Serde.class); + final Serializer keySerializer = mock(Serializer.class); + final Serde valueSerde = niceMock(Serde.class); + final Deserializer valueDeserializer = mock(Deserializer.class); + final Serializer valueSerializer = mock(Serializer.class); + expect(keySerde.serializer()).andStubReturn(keySerializer); + expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); + expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); + expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE); + expect(valueSerde.serializer()).andStubReturn(valueSerializer); + expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES); + expect(inner.get(KEY_BYTES)).andStubReturn(VALUE_BYTES); + replay(inner, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + metered = new MeteredKeyValueStore<>( + inner, + STORE_TYPE, + new MockTime(), + keySerde, + valueSerde + ); + metered.init(context, metered); + + metered.get(KEY); + metered.put(KEY, VALUE); + + verify(keySerializer, valueDeserializer, valueSerializer); + } + @Test public void testMetrics() { init(); @@ -159,7 +209,7 @@ public class MeteredKeyValueStoreTest { threadId, taskId.toString(), STORE_TYPE, - "metered" + STORE_NAME ))); if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) { assertTrue(reporter.containsMbean(String.format( @@ -176,11 +226,11 @@ public class MeteredKeyValueStoreTest { @Test public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { - inner.put(eq(keyBytes), aryEq(valueBytes)); + inner.put(eq(KEY_BYTES), aryEq(VALUE_BYTES)); expectLastCall(); init(); - metered.put(key, value); + metered.put(KEY, VALUE); final KafkaMetric metric = metric("put-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -189,10 +239,10 @@ public class MeteredKeyValueStoreTest { @Test public void shouldGetBytesFromInnerStoreAndReturnGetMetric() { - expect(inner.get(keyBytes)).andReturn(valueBytes); + expect(inner.get(KEY_BYTES)).andReturn(VALUE_BYTES); init(); - assertThat(metered.get(key), equalTo(value)); + assertThat(metered.get(KEY), equalTo(VALUE)); final KafkaMetric metric = metric("get-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -201,10 +251,10 @@ public class MeteredKeyValueStoreTest { @Test public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() { - expect(inner.putIfAbsent(eq(keyBytes), aryEq(valueBytes))).andReturn(null); + expect(inner.putIfAbsent(eq(KEY_BYTES), aryEq(VALUE_BYTES))).andReturn(null); init(); - metered.putIfAbsent(key, value); + metered.putIfAbsent(KEY, VALUE); final KafkaMetric metric = metric("put-if-absent-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -218,7 +268,7 @@ public class MeteredKeyValueStoreTest { expectLastCall(); init(); - metered.putAll(Collections.singletonList(KeyValue.pair(key, value))); + metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE))); final KafkaMetric metric = metric("put-all-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -227,10 +277,10 @@ public class MeteredKeyValueStoreTest { @Test public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() { - expect(inner.delete(keyBytes)).andReturn(valueBytes); + expect(inner.delete(KEY_BYTES)).andReturn(VALUE_BYTES); init(); - metered.delete(key); + metered.delete(KEY); final KafkaMetric metric = metric("delete-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -239,12 +289,12 @@ public class MeteredKeyValueStoreTest { @Test public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() { - expect(inner.range(keyBytes, keyBytes)) - .andReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValuePair).iterator())); + expect(inner.range(KEY_BYTES, KEY_BYTES)) + .andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator())); init(); - final KeyValueIterator iterator = metered.range(key, key); - assertThat(iterator.next().value, equalTo(value)); + final KeyValueIterator iterator = metered.range(KEY, KEY); + assertThat(iterator.next().value, equalTo(VALUE)); assertFalse(iterator.hasNext()); iterator.close(); @@ -255,11 +305,11 @@ public class MeteredKeyValueStoreTest { @Test public void shouldGetAllFromInnerStoreAndRecordAllMetric() { - expect(inner.all()).andReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValuePair).iterator())); + expect(inner.all()).andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator())); init(); final KeyValueIterator iterator = metered.all(); - assertThat(iterator.next().value, equalTo(value)); + assertThat(iterator.next().value, equalTo(VALUE)); assertFalse(iterator.hasNext()); iterator.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 7d558dca723..83136f4690a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -24,7 +24,10 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; @@ -33,6 +36,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -64,6 +68,7 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; @@ -81,24 +86,32 @@ public class MeteredSessionStoreTest { @Rule public EasyMockRule rule = new EasyMockRule(this); + private static final String APPLICATION_ID = "test-app"; private static final String STORE_TYPE = "scope"; + private static final String STORE_NAME = "mocked-store"; private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-" + STORE_TYPE + "-state-metrics"; private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id"; private static final String THREAD_ID_TAG_KEY = "thread-id"; + private static final String CHANGELOG_TOPIC = "changelog-topic"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final Windowed WINDOWED_KEY = new Windowed<>(KEY, new SessionWindow(0, 0)); + private static final Windowed WINDOWED_KEY_BYTES = new Windowed<>(KEY_BYTES, new SessionWindow(0, 0)); + private static final String VALUE = "value"; + private static final byte[] VALUE_BYTES = VALUE.getBytes(); + private static final long START_TIMESTAMP = 24L; + private static final long END_TIMESTAMP = 42L; private final String threadId = Thread.currentThread().getName(); private final TaskId taskId = new TaskId(0, 0); private final Metrics metrics = new Metrics(); - private MeteredSessionStore metered; + private MeteredSessionStore store; @Mock(type = MockType.NICE) - private SessionStore inner; + private SessionStore innerStore; @Mock(type = MockType.NICE) private InternalProcessorContext context; - private final String key = "a"; - private final byte[] keyBytes = key.getBytes(); - private final Windowed windowedKeyBytes = new Windowed<>(Bytes.wrap(keyBytes), new SessionWindow(0, 0)); private String storeLevelGroup; private String threadIdTagKey; private Map tags; @@ -116,17 +129,19 @@ public class MeteredSessionStoreTest { @Before public void before() { - metered = new MeteredSessionStore<>( - inner, - "scope", + store = new MeteredSessionStore<>( + innerStore, + STORE_TYPE, Serdes.String(), Serdes.String(), - new MockTime()); + new MockTime() + ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); - expect(context.metrics()) - .andReturn(new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion)).anyTimes(); - expect(context.taskId()).andReturn(taskId).anyTimes(); - expect(inner.name()).andReturn("metered").anyTimes(); + expect(context.applicationId()).andStubReturn(APPLICATION_ID); + expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)); + expect(context.taskId()).andStubReturn(taskId); + expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); + expect(innerStore.name()).andStubReturn(STORE_NAME); storeLevelGroup = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP; threadIdTagKey = @@ -134,13 +149,55 @@ public class MeteredSessionStoreTest { tags = mkMap( mkEntry(threadIdTagKey, threadId), mkEntry("task-id", taskId.toString()), - mkEntry(STORE_TYPE + "-state-id", "metered") + mkEntry(STORE_TYPE + "-state-id", STORE_NAME) ); } private void init() { - replay(inner, context); - metered.init(context, metered); + replay(innerStore, context); + store.init(context, store); + } + + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + final String defaultChangelogTopicName = + ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME); + expect(context.changelogFor(STORE_NAME)).andReturn(null); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde keySerde = niceMock(Serde.class); + final Serializer keySerializer = mock(Serializer.class); + final Serde valueSerde = niceMock(Serde.class); + final Deserializer valueDeserializer = mock(Deserializer.class); + final Serializer valueSerializer = mock(Serializer.class); + expect(keySerde.serializer()).andStubReturn(keySerializer); + expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); + expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); + expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE); + expect(valueSerde.serializer()).andStubReturn(valueSerializer); + expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES); + expect(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).andStubReturn(VALUE_BYTES); + replay(innerStore, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + store = new MeteredSessionStore<>( + innerStore, + STORE_TYPE, + keySerde, + valueSerde, + new MockTime() + ); + store.init(context, store); + + store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP); + store.put(WINDOWED_KEY, VALUE); + + verify(keySerializer, valueDeserializer, valueSerializer); } @Test @@ -158,7 +215,7 @@ public class MeteredSessionStoreTest { threadId, taskId.toString(), STORE_TYPE, - "metered" + STORE_NAME ))); if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) { assertTrue(reporter.containsMbean(String.format( @@ -175,97 +232,97 @@ public class MeteredSessionStoreTest { @Test public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { - inner.put(eq(windowedKeyBytes), aryEq(keyBytes)); + innerStore.put(eq(WINDOWED_KEY_BYTES), aryEq(VALUE_BYTES)); expectLastCall(); init(); - metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key); + store.put(WINDOWED_KEY, VALUE); final KafkaMetric metric = metric("put-rate"); assertTrue(((Double) metric.metricValue()) > 0); - verify(inner); + verify(innerStore); } @Test public void shouldFindSessionsFromStoreAndRecordFetchMetric() { - expect(inner.findSessions(Bytes.wrap(keyBytes), 0, 0)) + expect(innerStore.findSessions(KEY_BYTES, 0, 0)) .andReturn(new KeyValueIteratorStub<>( - Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); - final KeyValueIterator, String> iterator = metered.findSessions(key, 0, 0); - assertThat(iterator.next().value, equalTo(key)); + final KeyValueIterator, String> iterator = store.findSessions(KEY, 0, 0); + assertThat(iterator.next().value, equalTo(VALUE)); assertFalse(iterator.hasNext()); iterator.close(); final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(inner); + verify(innerStore); } @Test public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { - expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, 0)) + expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) .andReturn(new KeyValueIteratorStub<>( - Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); - final KeyValueIterator, String> iterator = metered.findSessions(key, key, 0, 0); - assertThat(iterator.next().value, equalTo(key)); + final KeyValueIterator, String> iterator = store.findSessions(KEY, KEY, 0, 0); + assertThat(iterator.next().value, equalTo(VALUE)); assertFalse(iterator.hasNext()); iterator.close(); final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(inner); + verify(innerStore); } @Test public void shouldRemoveFromStoreAndRecordRemoveMetric() { - inner.remove(windowedKeyBytes); + innerStore.remove(WINDOWED_KEY_BYTES); expectLastCall(); init(); - metered.remove(new Windowed<>(key, new SessionWindow(0, 0))); + store.remove(new Windowed<>(KEY, new SessionWindow(0, 0))); final KafkaMetric metric = metric("remove-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(inner); + verify(innerStore); } @Test public void shouldFetchForKeyAndRecordFetchMetric() { - expect(inner.fetch(Bytes.wrap(keyBytes))) + expect(innerStore.fetch(KEY_BYTES)) .andReturn(new KeyValueIteratorStub<>( - Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); - final KeyValueIterator, String> iterator = metered.fetch(key); - assertThat(iterator.next().value, equalTo(key)); + final KeyValueIterator, String> iterator = store.fetch(KEY); + assertThat(iterator.next().value, equalTo(VALUE)); assertFalse(iterator.hasNext()); iterator.close(); final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(inner); + verify(innerStore); } @Test public void shouldFetchRangeFromStoreAndRecordFetchMetric() { - expect(inner.fetch(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes))) + expect(innerStore.fetch(KEY_BYTES, KEY_BYTES)) .andReturn(new KeyValueIteratorStub<>( - Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); - final KeyValueIterator, String> iterator = metered.fetch(key, key); - assertThat(iterator.next().value, equalTo(key)); + final KeyValueIterator, String> iterator = store.fetch(KEY, KEY); + assertThat(iterator.next().value, equalTo(VALUE)); assertFalse(iterator.hasNext()); iterator.close(); final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(inner); + verify(innerStore); } @Test @@ -277,50 +334,50 @@ public class MeteredSessionStoreTest { @Test public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() { - expect(inner.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null); + expect(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null); init(); - assertNull(metered.fetchSession("a", 0, Long.MAX_VALUE)); + assertNull(store.fetchSession("a", 0, Long.MAX_VALUE)); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnPutIfKeyIsNull() { - metered.put(null, "a"); + store.put(null, "a"); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnRemoveIfKeyIsNull() { - metered.remove(null); + store.remove(null); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnFetchIfKeyIsNull() { - metered.fetch(null); + store.fetch(null); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnFetchRangeIfFromIsNull() { - metered.fetch(null, "to"); + store.fetch(null, "to"); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnFetchRangeIfToIsNull() { - metered.fetch("from", null); + store.fetch("from", null); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnFindSessionsIfKeyIsNull() { - metered.findSessions(null, 0, 0); + store.findSessions(null, 0, 0); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnFindSessionsRangeIfFromIsNull() { - metered.findSessions(null, "a", 0, 0); + store.findSessions(null, "a", 0, 0); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnFindSessionsRangeIfToIsNull() { - metered.findSessions("a", null, 0, 0); + store.findSessions("a", null, 0, 0); } private interface CachedSessionStore extends SessionStore, CachedStateStore { } @@ -333,45 +390,45 @@ public class MeteredSessionStoreTest { expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); replay(cachedSessionStore); - metered = new MeteredSessionStore<>( + store = new MeteredSessionStore<>( cachedSessionStore, STORE_TYPE, Serdes.String(), Serdes.String(), new MockTime()); - assertTrue(metered.setFlushListener(null, false)); + assertTrue(store.setFlushListener(null, false)); verify(cachedSessionStore); } @Test public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { - assertFalse(metered.setFlushListener(null, false)); + assertFalse(store.setFlushListener(null, false)); } @Test public void shouldRemoveMetricsOnClose() { - inner.close(); + innerStore.close(); expectLastCall(); init(); // replays "inner" // There's always a "count" metric registered assertThat(storeMetrics(), not(empty())); - metered.close(); + store.close(); assertThat(storeMetrics(), empty()); - verify(inner); + verify(innerStore); } @Test public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { - inner.close(); + innerStore.close(); expectLastCall().andThrow(new RuntimeException("Oops!")); init(); // replays "inner" assertThat(storeMetrics(), not(empty())); - assertThrows(RuntimeException.class, metered::close); + assertThrows(RuntimeException.class, store::close); assertThat(storeMetrics(), empty()); - verify(inner); + verify(innerStore); } private KafkaMetric metric(final String name) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 5522855671a..8d5504a14e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -23,8 +23,10 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; @@ -32,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -64,6 +67,7 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; @@ -79,11 +83,21 @@ public class MeteredTimestampedKeyValueStoreTest { @Rule public EasyMockRule rule = new EasyMockRule(this); + private static final String APPLICATION_ID = "test-app"; + private static final String STORE_NAME = "store-name"; private static final String STORE_TYPE = "scope"; private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-" + STORE_TYPE + "-state-metrics"; private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; + private static final String CHANGELOG_TOPIC = "changelog-topic-name"; private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id"; private static final String THREAD_ID_TAG_KEY = "thread-id"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final ValueAndTimestamp VALUE_AND_TIMESTAMP = + ValueAndTimestamp.make("value", 97L); + // timestamp is 97 what is ASCII of 'a' + private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\0\0\0\0\0\0\0avalue".getBytes(); + private final String threadId = Thread.currentThread().getName(); private final TaskId taskId = new TaskId(0, 0); @@ -93,12 +107,9 @@ public class MeteredTimestampedKeyValueStoreTest { private InternalProcessorContext context; private MeteredTimestampedKeyValueStore metered; - private final String key = "key"; - private final Bytes keyBytes = Bytes.wrap(key.getBytes()); - private final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make("value", 97L); - // timestamp is 97 what is ASCII of 'a' - private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes(); - private final KeyValue byteKeyValueTimestampPair = KeyValue.pair(keyBytes, valueAndTimestampBytes); + private final KeyValue byteKeyValueTimestampPair = KeyValue.pair(KEY_BYTES, + VALUE_AND_TIMESTAMP_BYTES + ); private final Metrics metrics = new Metrics(); private String storeLevelGroup; private String threadIdTagKey; @@ -125,10 +136,12 @@ public class MeteredTimestampedKeyValueStoreTest { new ValueAndTimestampSerde<>(Serdes.String()) ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); + expect(context.applicationId()).andStubReturn(APPLICATION_ID); expect(context.metrics()) - .andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes(); - expect(context.taskId()).andReturn(taskId).anyTimes(); - expect(inner.name()).andReturn("metered").anyTimes(); + .andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)); + expect(context.taskId()).andStubReturn(taskId); + expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); + expect(inner.name()).andStubReturn(STORE_NAME); storeLevelGroup = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP; threadIdTagKey = @@ -136,7 +149,7 @@ public class MeteredTimestampedKeyValueStoreTest { tags = mkMap( mkEntry(threadIdTagKey, threadId), mkEntry("task-id", taskId.toString()), - mkEntry(STORE_TYPE + "-state-id", "metered") + mkEntry(STORE_TYPE + "-state-id", STORE_NAME) ); } @@ -146,6 +159,47 @@ public class MeteredTimestampedKeyValueStoreTest { metered.init(context, metered); } + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME); + expect(context.changelogFor(STORE_NAME)).andReturn(null); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde keySerde = niceMock(Serde.class); + final Serializer keySerializer = mock(Serializer.class); + final Serde> valueSerde = niceMock(Serde.class); + final Deserializer> valueDeserializer = mock(Deserializer.class); + final Serializer> valueSerializer = mock(Serializer.class); + expect(keySerde.serializer()).andStubReturn(keySerializer); + expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); + expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); + expect(valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP); + expect(valueSerde.serializer()).andStubReturn(valueSerializer); + expect(valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES); + expect(inner.get(KEY_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES); + replay(inner, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + metered = new MeteredTimestampedKeyValueStore<>( + inner, + STORE_TYPE, + new MockTime(), + keySerde, + valueSerde + ); + metered.init(context, metered); + + metered.get(KEY); + metered.put(KEY, VALUE_AND_TIMESTAMP); + + verify(keySerializer, valueDeserializer, valueSerializer); + } + @Test public void testMetrics() { init(); @@ -161,7 +215,7 @@ public class MeteredTimestampedKeyValueStoreTest { threadId, taskId.toString(), STORE_TYPE, - "metered" + STORE_NAME ))); if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) { assertTrue(reporter.containsMbean(String.format( @@ -177,11 +231,11 @@ public class MeteredTimestampedKeyValueStoreTest { } @Test public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { - inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes)); + inner.put(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES)); expectLastCall(); init(); - metered.put(key, valueAndTimestamp); + metered.put(KEY, VALUE_AND_TIMESTAMP); final KafkaMetric metric = metric("put-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -190,15 +244,15 @@ public class MeteredTimestampedKeyValueStoreTest { @Test public void shouldGetWithBinary() { - expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes); + expect(inner.get(KEY_BYTES)).andReturn(VALUE_AND_TIMESTAMP_BYTES); - inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes)); + inner.put(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES)); expectLastCall(); init(); - final RawAndDeserializedValue valueWithBinary = metered.getWithBinary(key); - assertEquals(valueWithBinary.value, valueAndTimestamp); - assertEquals(valueWithBinary.serializedValue, valueAndTimestampBytes); + final RawAndDeserializedValue valueWithBinary = metered.getWithBinary(KEY); + assertEquals(valueWithBinary.value, VALUE_AND_TIMESTAMP); + assertEquals(valueWithBinary.serializedValue, VALUE_AND_TIMESTAMP_BYTES); } @SuppressWarnings("resource") @@ -206,40 +260,38 @@ public class MeteredTimestampedKeyValueStoreTest { public void shouldNotPutIfSameValuesAndGreaterTimestamp() { init(); - metered.put(key, valueAndTimestamp); + metered.put(KEY, VALUE_AND_TIMESTAMP); final ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde<>(Serdes.String()); - final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp); + final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", VALUE_AND_TIMESTAMP); final ValueAndTimestamp newValueAndTimestamp = ValueAndTimestamp.make("value", 98L); - assertFalse(metered.putIfDifferentValues(key, - newValueAndTimestamp, - encodedOldValue)); + assertFalse(metered.putIfDifferentValues(KEY, newValueAndTimestamp, encodedOldValue)); verify(inner); } @SuppressWarnings("resource") @Test public void shouldPutIfOutOfOrder() { - inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes)); + inner.put(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES)); expectLastCall(); init(); - metered.put(key, valueAndTimestamp); + metered.put(KEY, VALUE_AND_TIMESTAMP); final ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde<>(Serdes.String()); - final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp); + final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", VALUE_AND_TIMESTAMP); final ValueAndTimestamp outOfOrderValueAndTimestamp = ValueAndTimestamp.make("value", 95L); - assertTrue(metered.putIfDifferentValues(key, outOfOrderValueAndTimestamp, encodedOldValue)); + assertTrue(metered.putIfDifferentValues(KEY, outOfOrderValueAndTimestamp, encodedOldValue)); verify(inner); } @Test public void shouldGetBytesFromInnerStoreAndReturnGetMetric() { - expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes); + expect(inner.get(KEY_BYTES)).andReturn(VALUE_AND_TIMESTAMP_BYTES); init(); - assertThat(metered.get(key), equalTo(valueAndTimestamp)); + assertThat(metered.get(KEY), equalTo(VALUE_AND_TIMESTAMP)); final KafkaMetric metric = metric("get-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -248,10 +300,10 @@ public class MeteredTimestampedKeyValueStoreTest { @Test public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() { - expect(inner.putIfAbsent(eq(keyBytes), aryEq(valueAndTimestampBytes))).andReturn(null); + expect(inner.putIfAbsent(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES))).andReturn(null); init(); - metered.putIfAbsent(key, valueAndTimestamp); + metered.putIfAbsent(KEY, VALUE_AND_TIMESTAMP); final KafkaMetric metric = metric("put-if-absent-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -269,7 +321,7 @@ public class MeteredTimestampedKeyValueStoreTest { expectLastCall(); init(); - metered.putAll(Collections.singletonList(KeyValue.pair(key, valueAndTimestamp))); + metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE_AND_TIMESTAMP))); final KafkaMetric metric = metric("put-all-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -278,10 +330,10 @@ public class MeteredTimestampedKeyValueStoreTest { @Test public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() { - expect(inner.delete(keyBytes)).andReturn(valueAndTimestampBytes); + expect(inner.delete(KEY_BYTES)).andReturn(VALUE_AND_TIMESTAMP_BYTES); init(); - metered.delete(key); + metered.delete(KEY); final KafkaMetric metric = metric("delete-rate"); assertTrue((Double) metric.metricValue() > 0); @@ -290,12 +342,12 @@ public class MeteredTimestampedKeyValueStoreTest { @Test public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() { - expect(inner.range(keyBytes, keyBytes)).andReturn( + expect(inner.range(KEY_BYTES, KEY_BYTES)).andReturn( new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampPair).iterator())); init(); - final KeyValueIterator> iterator = metered.range(key, key); - assertThat(iterator.next().value, equalTo(valueAndTimestamp)); + final KeyValueIterator> iterator = metered.range(KEY, KEY); + assertThat(iterator.next().value, equalTo(VALUE_AND_TIMESTAMP)); assertFalse(iterator.hasNext()); iterator.close(); @@ -311,7 +363,7 @@ public class MeteredTimestampedKeyValueStoreTest { init(); final KeyValueIterator> iterator = metered.all(); - assertThat(iterator.next().value, equalTo(valueAndTimestamp)); + assertThat(iterator.next().value, equalTo(VALUE_AND_TIMESTAMP)); assertFalse(iterator.hasNext()); iterator.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index 4970245b8a2..a75774942eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -19,12 +19,16 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; @@ -36,25 +40,42 @@ import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; public class MeteredTimestampedWindowStoreTest { + + private static final String STORE_NAME = "mocked-store"; + private static final String STORE_TYPE = "scope"; + private static final String CHANGELOG_TOPIC = "changelog-topic"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + // timestamp is 97 what is ASCII of 'a' + private static final long TIMESTAMP = 97L; + private static final ValueAndTimestamp VALUE_AND_TIMESTAMP = + ValueAndTimestamp.make("value", TIMESTAMP); + private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\0\0\0\0\0\0\0avalue".getBytes(); + private static final int WINDOW_SIZE_MS = 10; + private InternalMockProcessorContext context; - @SuppressWarnings("unchecked") private final WindowStore innerStoreMock = EasyMock.createNiceMock(WindowStore.class); - private final MeteredTimestampedWindowStore store = new MeteredTimestampedWindowStore<>( + private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG)); + private MeteredTimestampedWindowStore store = new MeteredTimestampedWindowStore<>( innerStoreMock, - 10L, // any size - "scope", + WINDOW_SIZE_MS, // any size + STORE_TYPE, new MockTime(), Serdes.String(), new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull()) ); - private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG)); { - EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes(); + EasyMock.expect(innerStoreMock.name()).andStubReturn(STORE_NAME); } @Before @@ -73,6 +94,49 @@ public class MeteredTimestampedWindowStoreTest { ); } + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC); + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + final String defaultChangelogTopicName = + ProcessorStateManager.storeChangelogTopic(context.applicationId(), STORE_NAME); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde keySerde = niceMock(Serde.class); + final Serializer keySerializer = mock(Serializer.class); + final Serde> valueSerde = niceMock(Serde.class); + final Deserializer> valueDeserializer = mock(Deserializer.class); + final Serializer> valueSerializer = mock(Serializer.class); + expect(keySerde.serializer()).andStubReturn(keySerializer); + expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); + expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); + expect(valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP); + expect(valueSerde.serializer()).andStubReturn(valueSerializer); + expect(valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES); + expect(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES); + replay(innerStoreMock, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + store = new MeteredTimestampedWindowStore<>( + innerStoreMock, + WINDOW_SIZE_MS, + STORE_TYPE, + new MockTime(), + keySerde, + valueSerde + ); + store.init(context, store); + + store.fetch(KEY, TIMESTAMP); + store.put(KEY, VALUE_AND_TIMESTAMP, TIMESTAMP); + + verify(keySerializer, valueDeserializer, valueSerializer); + } + @Test public void shouldCloseUnderlyingStore() { innerStoreMock.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 2e1fb12ec48..3f7eb1de2db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -24,12 +24,16 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.InternalMockProcessorContext; @@ -61,6 +65,7 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; @@ -81,14 +86,20 @@ public class MeteredWindowStoreTest { private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id"; private static final String THREAD_ID_TAG_KEY = "thread-id"; private static final String STORE_NAME = "mocked-store"; + private static final String CHANGELOG_TOPIC = "changelog-topic"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final String VALUE = "value"; + private static final byte[] VALUE_BYTES = VALUE.getBytes(); + private static final int WINDOW_SIZE_MS = 10; + private static final long TIMESTAMP = 42L; private final String threadId = Thread.currentThread().getName(); private InternalMockProcessorContext context; - @SuppressWarnings("unchecked") private final WindowStore innerStoreMock = createNiceMock(WindowStore.class); - private final MeteredWindowStore store = new MeteredWindowStore<>( + private MeteredWindowStore store = new MeteredWindowStore<>( innerStoreMock, - 10L, // any size + WINDOW_SIZE_MS, // any size STORE_TYPE, new MockTime(), Serdes.String(), @@ -116,9 +127,7 @@ public class MeteredWindowStoreTest { @Before public void setUp() { - final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion); - + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion); context = new InternalMockProcessorContext( TestUtils.tempDirectory(), Serdes.String(), @@ -139,6 +148,49 @@ public class MeteredWindowStoreTest { ); } + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC); + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + final String defaultChangelogTopicName = + ProcessorStateManager.storeChangelogTopic(context.applicationId(), STORE_NAME); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde keySerde = niceMock(Serde.class); + final Serializer keySerializer = mock(Serializer.class); + final Serde valueSerde = niceMock(Serde.class); + final Deserializer valueDeserializer = mock(Deserializer.class); + final Serializer valueSerializer = mock(Serializer.class); + expect(keySerde.serializer()).andStubReturn(keySerializer); + expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); + expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); + expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE); + expect(valueSerde.serializer()).andStubReturn(valueSerializer); + expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES); + expect(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).andStubReturn(VALUE_BYTES); + replay(innerStoreMock, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + store = new MeteredWindowStore<>( + innerStoreMock, + WINDOW_SIZE_MS, + STORE_TYPE, + new MockTime(), + keySerde, + valueSerde + ); + store.init(context, store); + + store.fetch(KEY, TIMESTAMP); + store.put(KEY, VALUE, TIMESTAMP); + + verify(keySerializer, valueDeserializer, valueSerializer); + } + @Test public void testMetrics() { replay(innerStoreMock); diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index ae825bc4492..48e7295c6b1 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -88,4 +88,9 @@ public class GlobalStateManagerStub implements GlobalStateManager { public TaskType taskType() { return TaskType.GLOBAL; } + + @Override + public String changelogFor(final String storeName) { + return null; + } } diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index fd96b7f881b..bf2bcf07c39 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.StateManager; +import org.apache.kafka.streams.processor.internals.StateManagerStub; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.ToInternal; @@ -70,6 +72,7 @@ public class InternalMockProcessorContext private Serde keySerde; private Serde valueSerde; private long timestamp = -1L; + private final Map storeToChangelogTopic = new HashMap<>(); public InternalMockProcessorContext() { this(null, @@ -182,7 +185,6 @@ public class InternalMockProcessorContext new TaskId(0, 0), config, metrics, - null, cache ); super.setCurrentNode(new ProcessorNode<>("TESTING_NODE")); @@ -193,6 +195,11 @@ public class InternalMockProcessorContext this.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger(new SystemTime())); } + @Override + protected StateManager stateManager() { + return new StateManagerStub(); + } + @Override public RecordCollector recordCollector() { final RecordCollector recordCollector = recordCollectorSupplier.recordCollector(); @@ -400,4 +407,13 @@ public class InternalMockProcessorContext } restoreCallback.restoreBatch(records); } + + public void addChangelogForStore(final String storeName, final String changelogTopic) { + storeToChangelogTopic.put(storeName, changelogTopic); + } + + @Override + public String changelogFor(final String storeName) { + return storeToChangelogTopic.get(storeName); + } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index c62f69a1a3d..58b90c1268c 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -145,4 +145,9 @@ public class MockInternalProcessorContext extends MockProcessorContext implement @Override public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) { } + + @Override + public String changelogFor(final String storeName) { + return "mock-changelog"; + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 88d5fcaacd4..b2b62eeb603 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -33,7 +33,11 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; + +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.StateManager; +import org.apache.kafka.streams.processor.internals.StateManagerStub; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -45,7 +49,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext { public Map forwardedValues = new HashMap<>(); public NoOpProcessorContext() { - super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null); + super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null); } private static StreamsConfig streamsConfig() { @@ -55,6 +59,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext { return new StreamsConfig(props); } + @Override + protected StateManager stateManager() { + return new StateManagerStub(); + } + @Override public StateStore getStateStore(final String name) { return null; @@ -134,4 +143,9 @@ public class NoOpProcessorContext extends AbstractProcessorContext { public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) { cache.addDirtyEntryFlushListener(namespace, listener); } + + @Override + public String changelogFor(final String storeName) { + return ProcessorStateManager.storeChangelogTopic(applicationId(), storeName); + } }