Browse Source

KAFKA-10179: Pass correct changelog topic to state serdes (#8902)

Until now we always passed the default changelog topic name
to the state serdes. However, for optimized source tables
and global tables the changelog topic is the source topic.

Most serdes do not use the topic name passed to them.
However, if the serdes actually use the topic name for
(de)serialization a
org.apache.kafka.common.errors.SerializationException is thrown.

This commits passed the correct changelog topic to the state
serdes of the metered state stores.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
pull/9005/head
Bruno Cadonna 4 years ago committed by GitHub
parent
commit
813f92c21a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
  2. 12
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
  3. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  4. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
  5. 29
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
  6. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
  7. 30
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  8. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
  9. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
  10. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
  11. 20
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
  12. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
  13. 10
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
  14. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
  15. 76
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
  16. 12
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
  17. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
  18. 20
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
  19. 24
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
  20. 49
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
  21. 4
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
  22. 13
      streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
  23. 102
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
  24. 181
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
  25. 128
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
  26. 76
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
  27. 64
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
  28. 5
      streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
  29. 18
      streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
  30. 5
      streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
  31. 16
      streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java

16
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java

@ -45,24 +45,23 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @@ -45,24 +45,23 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
protected ProcessorNode<?, ?> currentNode;
private long currentSystemTimeMs;
final StateManager stateManager;
protected ThreadCache cache;
public AbstractProcessorContext(final TaskId taskId,
final StreamsConfig config,
final StreamsMetricsImpl metrics,
final StateManager stateManager,
final ThreadCache cache) {
this.taskId = taskId;
this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.config = config;
this.metrics = metrics;
this.stateManager = stateManager;
valueSerde = config.defaultValueSerde();
keySerde = config.defaultKeySerde();
this.cache = cache;
}
protected abstract StateManager stateManager();
@Override
public void setSystemTimeMs(final long timeMs) {
currentSystemTimeMs = timeMs;
@ -95,7 +94,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @@ -95,7 +94,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
@Override
public File stateDir() {
return stateManager.baseDir();
return stateManager().baseDir();
}
@Override
@ -110,7 +109,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @@ -110,7 +109,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
throw new IllegalStateException("Can only create state stores during initialization.");
}
Objects.requireNonNull(store, "store must not be null");
stateManager.registerStore(store, stateRestoreCallback);
stateManager().registerStore(store, stateRestoreCallback);
}
/**
@ -223,6 +222,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @@ -223,6 +222,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
@Override
public TaskType taskType() {
return stateManager.taskType();
return stateManager().taskType();
}
@Override
public String changelogFor(final String storeName) {
return stateManager().changelogFor(storeName);
}
}

12
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java

@ -34,11 +34,19 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe @@ -34,11 +34,19 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
public class GlobalProcessorContextImpl extends AbstractProcessorContext {
private final GlobalStateManager stateManager;
public GlobalProcessorContextImpl(final StreamsConfig config,
final StateManager stateMgr,
final GlobalStateManager stateMgr,
final StreamsMetricsImpl metrics,
final ThreadCache cache) {
super(new TaskId(-1, -1), config, metrics, stateMgr, cache);
super(new TaskId(-1, -1), config, metrics, cache);
stateManager = stateMgr;
}
@Override
protected StateManager stateManager() {
return stateManager;
}
@Override

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -59,7 +59,6 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.conv @@ -59,7 +59,6 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.conv
*/
public class GlobalStateManagerImpl implements GlobalStateManager {
private final Logger log;
private final ProcessorTopology topology;
private final Consumer<byte[], byte[]> globalConsumer;
private final File baseDir;
private final StateDirectory stateDirectory;
@ -73,6 +72,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -73,6 +72,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
private final OffsetCheckpoint checkpointFile;
private final Map<TopicPartition, Long> checkpointFileCache;
private final Map<String, String> storeToChangelogTopic;
private final List<StateStore> globalStateStores;
public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology,
@ -80,20 +81,20 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -80,20 +81,20 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
storeToChangelogTopic = topology.storeToChangelogTopic();
globalStateStores = topology.globalStateStores();
baseDir = stateDirectory.globalStateDir();
checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
checkpointFileCache = new HashMap<>();
// Find non persistent store's topics
final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
for (final StateStore store : topology.globalStateStores()) {
for (final StateStore store : globalStateStores) {
if (!store.persistent()) {
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
globalNonPersistentStoresTopics.add(changelogFor(store.name()));
}
}
log = logContext.logger(GlobalStateManagerImpl.class);
this.topology = topology;
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
this.stateRestoreListener = stateRestoreListener;
@ -128,12 +129,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -128,12 +129,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
}
final List<StateStore> stateStores = topology.globalStateStores();
final Map<String, String> storeNameToChangelog = topology.storeToChangelogTopic();
final Set<String> changelogTopics = new HashSet<>();
for (final StateStore stateStore : stateStores) {
for (final StateStore stateStore : globalStateStores) {
globalStoreNames.add(stateStore.name());
final String sourceTopic = storeNameToChangelog.get(stateStore.name());
final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
stateStore.init(globalProcessorContext, stateStore);
}
@ -226,7 +225,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -226,7 +225,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
final String sourceTopic = topology.storeToChangelogTopic().get(store.name());
final String sourceTopic = storeToChangelogTopic.get(store.name());
List<PartitionInfo> partitionInfos;
int attempts = 0;
while (true) {
@ -402,4 +401,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -402,4 +401,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
public Map<TopicPartition, Long> changelogOffsets() {
return Collections.unmodifiableMap(checkpointFileCache);
}
public String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
}

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java

@ -119,4 +119,5 @@ public interface InternalProcessorContext extends ProcessorContext { @@ -119,4 +119,5 @@ public interface InternalProcessorContext extends ProcessorContext {
final byte[] value,
final long timestamp);
String changelogFor(final String storeName);
}

29
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals; @@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@ -25,7 +27,6 @@ import org.apache.kafka.streams.internals.ApiUtils; @@ -25,7 +27,6 @@ import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
@ -49,7 +50,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -49,7 +50,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
private final ToInternal toInternal = new ToInternal();
private final static To SEND_TO_ALL = To.all();
final Map<String, String> storeToChangelogTopic = new HashMap<>();
private final ProcessorStateManager stateManager;
final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new HashMap<>();
public ProcessorContextImpl(final TaskId id,
@ -57,7 +59,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -57,7 +59,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final ProcessorStateManager stateMgr,
final StreamsMetricsImpl metrics,
final ThreadCache cache) {
super(id, config, metrics, stateMgr, cache);
super(id, config, metrics, cache);
stateManager = stateMgr;
}
@Override
@ -96,15 +99,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -96,15 +99,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
public ProcessorStateManager stateManager() {
return (ProcessorStateManager) stateManager;
}
@Override
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback) {
storeToChangelogTopic.put(store.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), store.name()));
super.register(store, stateRestoreCallback);
public ProcessorStateManager stateManager() {
return stateManager;
}
@Override
@ -118,16 +115,20 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -118,16 +115,20 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final byte[] value,
final long timestamp) {
throwUnsupportedOperationExceptionIfStandby("logChange");
final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);
// Sending null headers to changelog topics (KIP-244)
collector.send(
storeToChangelogTopic.get(storeName),
changelogPartition.topic(),
key,
value,
null,
taskId().partition,
changelogPartition.partition(),
timestamp,
BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER);
BYTEARRAY_VALUE_SERIALIZER
);
}
/**

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java

@ -46,4 +46,10 @@ public final class ProcessorContextUtils { @@ -46,4 +46,10 @@ public final class ProcessorContextUtils {
public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) {
return (StreamsMetricsImpl) context.metrics();
}
public static String changelogFor(final ProcessorContext context, final String storeName) {
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}
}

30
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -147,8 +147,8 @@ public class ProcessorStateManager implements StateManager { @@ -147,8 +147,8 @@ public class ProcessorStateManager implements StateManager {
private final TaskId taskId;
private final boolean eosEnabled;
private final ChangelogRegister changelogReader;
private final Map<String, String> storeToChangelogTopic;
private final Collection<TopicPartition> sourcePartitions;
private final Map<String, String> storeToChangelogTopic;
// must be maintained in topological order
private final FixedOrderMap<String, StateStoreMetadata> stores = new FixedOrderMap<>();
@ -174,7 +174,7 @@ public class ProcessorStateManager implements StateManager { @@ -174,7 +174,7 @@ public class ProcessorStateManager implements StateManager {
final ChangelogRegister changelogReader,
final Map<String, String> storeToChangelogTopic,
final Collection<TopicPartition> sourcePartitions) throws ProcessorStateException {
this.storeToChangelogTopic = storeToChangelogTopic;
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.taskId = taskId;
@ -182,7 +182,6 @@ public class ProcessorStateManager implements StateManager { @@ -182,7 +182,6 @@ public class ProcessorStateManager implements StateManager {
this.eosEnabled = eosEnabled;
this.changelogReader = changelogReader;
this.sourcePartitions = sourcePartitions;
this.storeToChangelogTopic = storeToChangelogTopic;
this.baseDir = stateDirectory.directoryForTask(taskId);
this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
@ -559,13 +558,13 @@ public class ProcessorStateManager implements StateManager { @@ -559,13 +558,13 @@ public class ProcessorStateManager implements StateManager {
// NOTE we assume the partition of the topic can always be inferred from the task id;
// if user ever use a custom partition grouper (deprecated in KIP-528) this would break and
// it is not a regression (it would always break anyways)
return new TopicPartition(storeToChangelogTopic.get(storeName), taskId.partition);
return new TopicPartition(changelogFor(storeName), taskId.partition);
}
private boolean isLoggingEnabled(final String storeName) {
// if the store name does not exist in the changelog map, it means the underlying store
// is not log enabled (including global stores)
return storeToChangelogTopic.containsKey(storeName);
return changelogFor(storeName) != null;
}
private StateStoreMetadata findStore(final TopicPartition changelogPartition) {
@ -590,4 +589,25 @@ public class ProcessorStateManager implements StateManager { @@ -590,4 +589,25 @@ public class ProcessorStateManager implements StateManager {
private Long changelogOffsetFromCheckpointedOffset(final long offset) {
return offset != OFFSET_UNKNOWN ? offset : null;
}
public TopicPartition registeredChangelogPartitionFor(final String storeName) {
final StateStoreMetadata storeMetadata = stores.get(storeName);
if (storeMetadata == null) {
throw new IllegalStateException("State store " + storeName
+ " for which the registered changelog partition should be"
+ " retrieved has not been registered"
);
}
if (storeMetadata.changelogPartition == null) {
throw new IllegalStateException("Registered state store " + storeName
+ " does not have a registered changelog partition."
+ " This may happen if logging is disabled for the state store."
);
}
return storeMetadata.changelogPartition;
}
public String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
}

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java

@ -104,11 +104,11 @@ public class ProcessorTopology { @@ -104,11 +104,11 @@ public class ProcessorTopology {
}
public List<StateStore> globalStateStores() {
return globalStateStores;
return Collections.unmodifiableList(globalStateStores);
}
public Map<String, String> storeToChangelogTopic() {
return storeToChangelogTopic;
return Collections.unmodifiableMap(storeToChangelogTopic);
}
boolean isRepartitionTopic(final String topic) {

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java

@ -26,7 +26,7 @@ import java.io.IOException; @@ -26,7 +26,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
interface StateManager {
public interface StateManager {
File baseDir();
/**
@ -48,6 +48,8 @@ interface StateManager { @@ -48,6 +48,8 @@ interface StateManager {
TaskType taskType();
String changelogFor(final String storeName);
// TODO: we can remove this when consolidating global state manager into processor state manager
StateStore getGlobalStore(final String name);
}

7
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

@ -24,6 +24,7 @@ import org.apache.kafka.streams.KeyValue; @@ -24,6 +24,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -103,8 +104,12 @@ public class MeteredKeyValueStore<K, V> @@ -103,8 +104,12 @@ public class MeteredKeyValueStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
}

20
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java

@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -65,14 +66,10 @@ public class MeteredSessionStore<K, V> @@ -65,14 +66,10 @@ public class MeteredSessionStore<K, V>
this.time = time;
}
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context,
final StateStore root) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
initStoreSerde(context);
taskId = context.taskId().toString();
streamsMetrics = (StreamsMetricsImpl) context.metrics();
@ -87,6 +84,19 @@ public class MeteredSessionStore<K, V> @@ -87,6 +84,19 @@ public class MeteredSessionStore<K, V>
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
@SuppressWarnings("unchecked")
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde
);
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,

7
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes; @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
@ -51,8 +52,12 @@ public class MeteredTimestampedKeyValueStore<K, V> @@ -51,8 +52,12 @@ public class MeteredTimestampedKeyValueStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
}

10
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@ -50,9 +51,14 @@ class MeteredTimestampedWindowStore<K, V> @@ -50,9 +51,14 @@ class MeteredTimestampedWindowStore<K, V>
@SuppressWarnings("unchecked")
@Override
void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
);
}
}

7
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java

@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -87,8 +88,12 @@ public class MeteredWindowStore<K, V> @@ -87,8 +88,12 @@ public class MeteredWindowStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
}

76
streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java

@ -0,0 +1,76 @@ @@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({InternalTopologyBuilder.class})
public class TableSourceNodeTest {
private static final String STORE_NAME = "store-name";
private static final String TOPIC = "input-topic";
private final InternalTopologyBuilder topologyBuilder = PowerMock.createNiceMock(InternalTopologyBuilder.class);
@Test
public void shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() {
final boolean shouldReuseSourceTopicForChangelog = true;
topologyBuilder.connectSourceStoreAndTopic(STORE_NAME, TOPIC);
EasyMock.replay(topologyBuilder);
buildTableSourceNode(shouldReuseSourceTopicForChangelog);
EasyMock.verify(topologyBuilder);
}
@Test
public void shouldConnectStateStoreToChangelogTopic() {
final boolean shouldReuseSourceTopicForChangelog = false;
EasyMock.replay(topologyBuilder);
buildTableSourceNode(shouldReuseSourceTopicForChangelog);
EasyMock.verify(topologyBuilder);
}
private void buildTableSourceNode(final boolean shouldReuseSourceTopicForChangelog) {
final TableSourceNodeBuilder<String, String> tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder();
final TableSourceNode<String, String> tableSourceNode = tableSourceNodeBuilder
.withTopic(TOPIC)
.withMaterializedInternal(new MaterializedInternal<>(Materialized.as(STORE_NAME)))
.withConsumedInternal(new ConsumedInternal<>(Consumed.as("node-name")))
.withProcessorParameters(
new ProcessorParameters<>(new KTableSource<>(STORE_NAME, STORE_NAME), null))
.build();
tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog);
tableSourceNode.writeToTopology(topologyBuilder);
}
}

12
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java

@ -196,7 +196,12 @@ public class AbstractProcessorContextTest { @@ -196,7 +196,12 @@ public class AbstractProcessorContextTest {
}
TestProcessorContext(final MockStreamsMetrics metrics) {
super(new TaskId(0, 0), new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache(new LogContext("name "), 0, metrics));
super(new TaskId(0, 0), new StreamsConfig(config), metrics, new ThreadCache(new LogContext("name "), 0, metrics));
}
@Override
protected StateManager stateManager() {
return new StateManagerStub();
}
@Override
@ -254,5 +259,10 @@ public class AbstractProcessorContextTest { @@ -254,5 +259,10 @@ public class AbstractProcessorContextTest {
@Override
public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) {
}
@Override
public String changelogFor(final String storeName) {
return ProcessorStateManager.storeChangelogTopic(applicationId(), storeName);
}
}
}

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java

@ -65,7 +65,7 @@ public class GlobalProcessorContextImplTest { @@ -65,7 +65,7 @@ public class GlobalProcessorContextImplTest {
expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
replay(streamsConfig);
final StateManager stateManager = mock(StateManager.class);
final GlobalStateManager stateManager = mock(GlobalStateManager.class);
expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class));
expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class));
expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class));

20
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

@ -49,6 +49,7 @@ import static java.time.Duration.ofSeconds; @@ -49,6 +49,7 @@ import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@ -1089,4 +1090,23 @@ public class InternalTopologyBuilderTest { @@ -1089,4 +1090,23 @@ public class InternalTopologyBuilderTest {
new RepartitionTopicConfig("Y-topic-1y", Collections.emptyMap())
);
}
@Test
public void shouldConnectGlobalStateStoreToInputTopic() {
final String globalStoreName = "global-store";
final String globalTopic = "global-topic";
builder.setApplicationId("X");
builder.addGlobalStore(
new MockKeyValueStoreBuilder(globalStoreName, false).withLoggingDisabled(),
"globalSource",
null,
null,
null,
globalTopic,
"global-processor",
new MockProcessorSupplier<>());
builder.initializeSubscription();
assertThat(builder.buildGlobalStateTopology().storeToChangelogTopic().get(globalStoreName), is(globalTopic));
}
}

24
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import java.time.Duration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
@ -72,9 +74,14 @@ public class ProcessorContextImplTest { @@ -72,9 +74,14 @@ public class ProcessorContextImplTest {
private RecordCollector recordCollector = mock(RecordCollector.class);
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
private static final long VALUE = 42L;
private static final byte[] VALUE_BYTES = String.valueOf(VALUE).getBytes();
private static final long TIMESTAMP = 21L;
private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L);
private static final String STORE_NAME = "underlying-store";
private static final String REGISTERED_STORE_NAME = "registered-store";
private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1);
private boolean flushExecuted;
private boolean putExecuted;
@ -127,6 +134,7 @@ public class ProcessorContextImplTest { @@ -127,6 +134,7 @@ public class ProcessorContextImplTest {
expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
expect(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).andStubReturn(CHANGELOG_PARTITION);
replay(stateManager);
@ -373,16 +381,22 @@ public class ProcessorContextImplTest { @@ -373,16 +381,22 @@ public class ProcessorContextImplTest {
@Test
public void shouldNotSendRecordHeadersToChangelogTopic() {
final Bytes key = Bytes.wrap("key".getBytes());
final byte[] value = "zero".getBytes();
recordCollector.send(null, key, value, null, 0, 42L, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER);
recordCollector.send(
CHANGELOG_PARTITION.topic(),
KEY_BYTES,
VALUE_BYTES,
null,
CHANGELOG_PARTITION.partition(),
TIMESTAMP,
BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER
);
final StreamTask task = EasyMock.createNiceMock(StreamTask.class);
replay(recordCollector, task);
context.transitionToActive(task, recordCollector, null);
context.logChange("Store", key, value, 42L);
context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, TIMESTAMP);
verify(recordCollector);
}

49
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

@ -148,6 +148,17 @@ public class ProcessorStateManagerTest { @@ -148,6 +148,17 @@ public class ProcessorStateManagerTest {
Utils.delete(baseDir);
}
@Test
public void shouldReturnDefaultChangelogTopicName() {
final String applicationId = "appId";
final String storeName = "store";
assertThat(
ProcessorStateManager.storeChangelogTopic(applicationId, storeName),
is(applicationId + "-" + storeName + "-changelog")
);
}
@Test
public void shouldReturnBaseDir() {
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
@ -438,6 +449,44 @@ public class ProcessorStateManagerTest { @@ -438,6 +449,44 @@ public class ProcessorStateManagerTest {
}
}
@Test
public void shouldGetChangelogPartitionForRegisteredStore() {
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback);
final TopicPartition changelogPartition = stateMgr.registeredChangelogPartitionFor(persistentStoreName);
assertThat(changelogPartition.topic(), is(persistentStoreTopicName));
assertThat(changelogPartition.partition(), is(taskId.partition));
}
@Test
public void shouldThrowIfStateStoreIsNotRegistered() {
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
assertThrows("State store " + persistentStoreName
+ " for which the registered changelog partition should be"
+ " retrieved has not been registered",
IllegalStateException.class,
() -> stateMgr.registeredChangelogPartitionFor(persistentStoreName)
);
}
@Test
public void shouldThrowIfStateStoreHasLoggingDisabled() {
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
final String storeName = "store-with-logging-disabled";
final MockKeyValueStore storeWithLoggingDisabled = new MockKeyValueStore(storeName, true);
stateMgr.registerStore(storeWithLoggingDisabled, null);
assertThrows("Registered state store " + storeName
+ " does not have a registered changelog partition."
+ " This may happen if logging is disabled for the state store.",
IllegalStateException.class,
() -> stateMgr.registeredChangelogPartitionFor(storeName)
);
}
@Test
public void shouldFlushCheckpointAndClose() throws IOException {
checkpoint.write(emptyMap());

4
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java

@ -65,4 +65,8 @@ public class StateManagerStub implements StateManager { @@ -65,4 +65,8 @@ public class StateManagerStub implements StateManager {
return null;
}
@Override
public String changelogFor(final String storeName) {
return null;
}
}

13
streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java

@ -42,7 +42,7 @@ import java.util.List; @@ -42,7 +42,7 @@ import java.util.List;
import java.util.Map;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
@ -89,13 +89,12 @@ public class GlobalStateStoreProviderTest { @@ -89,13 +89,12 @@ public class GlobalStateStoreProviderTest {
Serdes.String(),
Serdes.String()).build());
final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class);
expect(mockContext.applicationId()).andReturn("appId").anyTimes();
final ProcessorContextImpl mockContext = niceMock(ProcessorContextImpl.class);
expect(mockContext.applicationId()).andStubReturn("appId");
expect(mockContext.metrics())
.andReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST))
.anyTimes();
expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
expect(mockContext.recordCollector()).andReturn(null).anyTimes();
.andStubReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST));
expect(mockContext.taskId()).andStubReturn(new TaskId(0, 0));
expect(mockContext.recordCollector()).andStubReturn(null);
replay(mockContext);
for (final StateStore store : stores.values()) {
store.init(mockContext, null);

102
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java

@ -23,13 +23,17 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -23,13 +23,17 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@ -61,6 +65,7 @@ import static org.easymock.EasyMock.eq; @@ -61,6 +65,7 @@ import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
@ -78,11 +83,19 @@ public class MeteredKeyValueStoreTest { @@ -78,11 +83,19 @@ public class MeteredKeyValueStoreTest {
@Rule
public EasyMockRule rule = new EasyMockRule(this);
private static final String APPLICATION_ID = "test-app";
private static final String STORE_NAME = "store-name";
private static final String STORE_TYPE = "scope";
private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-" + STORE_TYPE + "-state-metrics";
private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
private static final String CHANGELOG_TOPIC = "changelog-topic";
private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id";
private static final String THREAD_ID_TAG_KEY = "thread-id";
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final KeyValue<Bytes, byte[]> BYTE_KEY_VALUE_PAIR = KeyValue.pair(KEY_BYTES, VALUE_BYTES);
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0);
@ -93,11 +106,6 @@ public class MeteredKeyValueStoreTest { @@ -93,11 +106,6 @@ public class MeteredKeyValueStoreTest {
private InternalProcessorContext context;
private MeteredKeyValueStore<String, String> metered;
private final String key = "key";
private final Bytes keyBytes = Bytes.wrap(key.getBytes());
private final String value = "value";
private final byte[] valueBytes = value.getBytes();
private final KeyValue<Bytes, byte[]> byteKeyValuePair = KeyValue.pair(keyBytes, valueBytes);
private final Metrics metrics = new Metrics();
private String storeLevelGroup;
private String threadIdTagKey;
@ -124,10 +132,11 @@ public class MeteredKeyValueStoreTest { @@ -124,10 +132,11 @@ public class MeteredKeyValueStoreTest {
Serdes.String()
);
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics())
.andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes();
expect(context.taskId()).andReturn(taskId).anyTimes();
expect(inner.name()).andReturn("metered").anyTimes();
expect(context.applicationId()).andStubReturn(APPLICATION_ID);
expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion));
expect(context.taskId()).andStubReturn(taskId);
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
expect(inner.name()).andStubReturn(STORE_NAME);
storeLevelGroup =
StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
threadIdTagKey =
@ -135,7 +144,7 @@ public class MeteredKeyValueStoreTest { @@ -135,7 +144,7 @@ public class MeteredKeyValueStoreTest {
tags = mkMap(
mkEntry(threadIdTagKey, threadId),
mkEntry("task-id", taskId.toString()),
mkEntry(STORE_TYPE + "-state-id", "metered")
mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
);
}
@ -144,6 +153,47 @@ public class MeteredKeyValueStoreTest { @@ -144,6 +153,47 @@ public class MeteredKeyValueStoreTest {
metered.init(context, metered);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME);
expect(context.changelogFor(STORE_NAME)).andReturn(null);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) {
final Serde<String> keySerde = niceMock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<String> valueSerde = niceMock(Serde.class);
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
expect(keySerde.serializer()).andStubReturn(keySerializer);
expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes());
expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE);
expect(valueSerde.serializer()).andStubReturn(valueSerializer);
expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES);
expect(inner.get(KEY_BYTES)).andStubReturn(VALUE_BYTES);
replay(inner, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde);
metered = new MeteredKeyValueStore<>(
inner,
STORE_TYPE,
new MockTime(),
keySerde,
valueSerde
);
metered.init(context, metered);
metered.get(KEY);
metered.put(KEY, VALUE);
verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
public void testMetrics() {
init();
@ -159,7 +209,7 @@ public class MeteredKeyValueStoreTest { @@ -159,7 +209,7 @@ public class MeteredKeyValueStoreTest {
threadId,
taskId.toString(),
STORE_TYPE,
"metered"
STORE_NAME
)));
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
assertTrue(reporter.containsMbean(String.format(
@ -176,11 +226,11 @@ public class MeteredKeyValueStoreTest { @@ -176,11 +226,11 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
inner.put(eq(keyBytes), aryEq(valueBytes));
inner.put(eq(KEY_BYTES), aryEq(VALUE_BYTES));
expectLastCall();
init();
metered.put(key, value);
metered.put(KEY, VALUE);
final KafkaMetric metric = metric("put-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -189,10 +239,10 @@ public class MeteredKeyValueStoreTest { @@ -189,10 +239,10 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
expect(inner.get(keyBytes)).andReturn(valueBytes);
expect(inner.get(KEY_BYTES)).andReturn(VALUE_BYTES);
init();
assertThat(metered.get(key), equalTo(value));
assertThat(metered.get(KEY), equalTo(VALUE));
final KafkaMetric metric = metric("get-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -201,10 +251,10 @@ public class MeteredKeyValueStoreTest { @@ -201,10 +251,10 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
expect(inner.putIfAbsent(eq(keyBytes), aryEq(valueBytes))).andReturn(null);
expect(inner.putIfAbsent(eq(KEY_BYTES), aryEq(VALUE_BYTES))).andReturn(null);
init();
metered.putIfAbsent(key, value);
metered.putIfAbsent(KEY, VALUE);
final KafkaMetric metric = metric("put-if-absent-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -218,7 +268,7 @@ public class MeteredKeyValueStoreTest { @@ -218,7 +268,7 @@ public class MeteredKeyValueStoreTest {
expectLastCall();
init();
metered.putAll(Collections.singletonList(KeyValue.pair(key, value)));
metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE)));
final KafkaMetric metric = metric("put-all-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -227,10 +277,10 @@ public class MeteredKeyValueStoreTest { @@ -227,10 +277,10 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
expect(inner.delete(keyBytes)).andReturn(valueBytes);
expect(inner.delete(KEY_BYTES)).andReturn(VALUE_BYTES);
init();
metered.delete(key);
metered.delete(KEY);
final KafkaMetric metric = metric("delete-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -239,12 +289,12 @@ public class MeteredKeyValueStoreTest { @@ -239,12 +289,12 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
expect(inner.range(keyBytes, keyBytes))
.andReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValuePair).iterator()));
expect(inner.range(KEY_BYTES, KEY_BYTES))
.andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
init();
final KeyValueIterator<String, String> iterator = metered.range(key, key);
assertThat(iterator.next().value, equalTo(value));
final KeyValueIterator<String, String> iterator = metered.range(KEY, KEY);
assertThat(iterator.next().value, equalTo(VALUE));
assertFalse(iterator.hasNext());
iterator.close();
@ -255,11 +305,11 @@ public class MeteredKeyValueStoreTest { @@ -255,11 +305,11 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
expect(inner.all()).andReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValuePair).iterator()));
expect(inner.all()).andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
init();
final KeyValueIterator<String, String> iterator = metered.all();
assertThat(iterator.next().value, equalTo(value));
assertThat(iterator.next().value, equalTo(VALUE));
assertFalse(iterator.hasNext());
iterator.close();

181
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java

@ -24,7 +24,10 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -24,7 +24,10 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
@ -33,6 +36,7 @@ import org.apache.kafka.streams.kstream.Windowed; @@ -33,6 +36,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@ -64,6 +68,7 @@ import static org.easymock.EasyMock.eq; @@ -64,6 +68,7 @@ import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
@ -81,24 +86,32 @@ public class MeteredSessionStoreTest { @@ -81,24 +86,32 @@ public class MeteredSessionStoreTest {
@Rule
public EasyMockRule rule = new EasyMockRule(this);
private static final String APPLICATION_ID = "test-app";
private static final String STORE_TYPE = "scope";
private static final String STORE_NAME = "mocked-store";
private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-" + STORE_TYPE + "-state-metrics";
private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id";
private static final String THREAD_ID_TAG_KEY = "thread-id";
private static final String CHANGELOG_TOPIC = "changelog-topic";
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
private static final Windowed<String> WINDOWED_KEY = new Windowed<>(KEY, new SessionWindow(0, 0));
private static final Windowed<Bytes> WINDOWED_KEY_BYTES = new Windowed<>(KEY_BYTES, new SessionWindow(0, 0));
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final long START_TIMESTAMP = 24L;
private static final long END_TIMESTAMP = 42L;
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0);
private final Metrics metrics = new Metrics();
private MeteredSessionStore<String, String> metered;
private MeteredSessionStore<String, String> store;
@Mock(type = MockType.NICE)
private SessionStore<Bytes, byte[]> inner;
private SessionStore<Bytes, byte[]> innerStore;
@Mock(type = MockType.NICE)
private InternalProcessorContext context;
private final String key = "a";
private final byte[] keyBytes = key.getBytes();
private final Windowed<Bytes> windowedKeyBytes = new Windowed<>(Bytes.wrap(keyBytes), new SessionWindow(0, 0));
private String storeLevelGroup;
private String threadIdTagKey;
private Map<String, String> tags;
@ -116,17 +129,19 @@ public class MeteredSessionStoreTest { @@ -116,17 +129,19 @@ public class MeteredSessionStoreTest {
@Before
public void before() {
metered = new MeteredSessionStore<>(
inner,
"scope",
store = new MeteredSessionStore<>(
innerStore,
STORE_TYPE,
Serdes.String(),
Serdes.String(),
new MockTime());
new MockTime()
);
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics())
.andReturn(new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion)).anyTimes();
expect(context.taskId()).andReturn(taskId).anyTimes();
expect(inner.name()).andReturn("metered").anyTimes();
expect(context.applicationId()).andStubReturn(APPLICATION_ID);
expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion));
expect(context.taskId()).andStubReturn(taskId);
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
expect(innerStore.name()).andStubReturn(STORE_NAME);
storeLevelGroup =
StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
threadIdTagKey =
@ -134,13 +149,55 @@ public class MeteredSessionStoreTest { @@ -134,13 +149,55 @@ public class MeteredSessionStoreTest {
tags = mkMap(
mkEntry(threadIdTagKey, threadId),
mkEntry("task-id", taskId.toString()),
mkEntry(STORE_TYPE + "-state-id", "metered")
mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
);
}
private void init() {
replay(inner, context);
metered.init(context, metered);
replay(innerStore, context);
store.init(context, store);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName =
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME);
expect(context.changelogFor(STORE_NAME)).andReturn(null);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) {
final Serde<String> keySerde = niceMock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<String> valueSerde = niceMock(Serde.class);
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
expect(keySerde.serializer()).andStubReturn(keySerializer);
expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes());
expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE);
expect(valueSerde.serializer()).andStubReturn(valueSerializer);
expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES);
expect(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).andStubReturn(VALUE_BYTES);
replay(innerStore, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde);
store = new MeteredSessionStore<>(
innerStore,
STORE_TYPE,
keySerde,
valueSerde,
new MockTime()
);
store.init(context, store);
store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP);
store.put(WINDOWED_KEY, VALUE);
verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
@ -158,7 +215,7 @@ public class MeteredSessionStoreTest { @@ -158,7 +215,7 @@ public class MeteredSessionStoreTest {
threadId,
taskId.toString(),
STORE_TYPE,
"metered"
STORE_NAME
)));
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
assertTrue(reporter.containsMbean(String.format(
@ -175,97 +232,97 @@ public class MeteredSessionStoreTest { @@ -175,97 +232,97 @@ public class MeteredSessionStoreTest {
@Test
public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
inner.put(eq(windowedKeyBytes), aryEq(keyBytes));
innerStore.put(eq(WINDOWED_KEY_BYTES), aryEq(VALUE_BYTES));
expectLastCall();
init();
metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
store.put(WINDOWED_KEY, VALUE);
final KafkaMetric metric = metric("put-rate");
assertTrue(((Double) metric.metricValue()) > 0);
verify(inner);
verify(innerStore);
}
@Test
public void shouldFindSessionsFromStoreAndRecordFetchMetric() {
expect(inner.findSessions(Bytes.wrap(keyBytes), 0, 0))
expect(innerStore.findSessions(KEY_BYTES, 0, 0))
.andReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = metered.findSessions(key, 0, 0);
assertThat(iterator.next().value, equalTo(key));
final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, 0, 0);
assertThat(iterator.next().value, equalTo(VALUE));
assertFalse(iterator.hasNext());
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
verify(inner);
verify(innerStore);
}
@Test
public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() {
expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, 0))
expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
.andReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = metered.findSessions(key, key, 0, 0);
assertThat(iterator.next().value, equalTo(key));
final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, KEY, 0, 0);
assertThat(iterator.next().value, equalTo(VALUE));
assertFalse(iterator.hasNext());
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
verify(inner);
verify(innerStore);
}
@Test
public void shouldRemoveFromStoreAndRecordRemoveMetric() {
inner.remove(windowedKeyBytes);
innerStore.remove(WINDOWED_KEY_BYTES);
expectLastCall();
init();
metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
store.remove(new Windowed<>(KEY, new SessionWindow(0, 0)));
final KafkaMetric metric = metric("remove-rate");
assertTrue((Double) metric.metricValue() > 0);
verify(inner);
verify(innerStore);
}
@Test
public void shouldFetchForKeyAndRecordFetchMetric() {
expect(inner.fetch(Bytes.wrap(keyBytes)))
expect(innerStore.fetch(KEY_BYTES))
.andReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = metered.fetch(key);
assertThat(iterator.next().value, equalTo(key));
final KeyValueIterator<Windowed<String>, String> iterator = store.fetch(KEY);
assertThat(iterator.next().value, equalTo(VALUE));
assertFalse(iterator.hasNext());
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
verify(inner);
verify(innerStore);
}
@Test
public void shouldFetchRangeFromStoreAndRecordFetchMetric() {
expect(inner.fetch(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes)))
expect(innerStore.fetch(KEY_BYTES, KEY_BYTES))
.andReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = metered.fetch(key, key);
assertThat(iterator.next().value, equalTo(key));
final KeyValueIterator<Windowed<String>, String> iterator = store.fetch(KEY, KEY);
assertThat(iterator.next().value, equalTo(VALUE));
assertFalse(iterator.hasNext());
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
verify(inner);
verify(innerStore);
}
@Test
@ -277,50 +334,50 @@ public class MeteredSessionStoreTest { @@ -277,50 +334,50 @@ public class MeteredSessionStoreTest {
@Test
public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() {
expect(inner.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null);
expect(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null);
init();
assertNull(metered.fetchSession("a", 0, Long.MAX_VALUE));
assertNull(store.fetchSession("a", 0, Long.MAX_VALUE));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnPutIfKeyIsNull() {
metered.put(null, "a");
store.put(null, "a");
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnRemoveIfKeyIsNull() {
metered.remove(null);
store.remove(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFetchIfKeyIsNull() {
metered.fetch(null);
store.fetch(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFetchRangeIfFromIsNull() {
metered.fetch(null, "to");
store.fetch(null, "to");
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFetchRangeIfToIsNull() {
metered.fetch("from", null);
store.fetch("from", null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFindSessionsIfKeyIsNull() {
metered.findSessions(null, 0, 0);
store.findSessions(null, 0, 0);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFindSessionsRangeIfFromIsNull() {
metered.findSessions(null, "a", 0, 0);
store.findSessions(null, "a", 0, 0);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFindSessionsRangeIfToIsNull() {
metered.findSessions("a", null, 0, 0);
store.findSessions("a", null, 0, 0);
}
private interface CachedSessionStore extends SessionStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }
@ -333,45 +390,45 @@ public class MeteredSessionStoreTest { @@ -333,45 +390,45 @@ public class MeteredSessionStoreTest {
expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true);
replay(cachedSessionStore);
metered = new MeteredSessionStore<>(
store = new MeteredSessionStore<>(
cachedSessionStore,
STORE_TYPE,
Serdes.String(),
Serdes.String(),
new MockTime());
assertTrue(metered.setFlushListener(null, false));
assertTrue(store.setFlushListener(null, false));
verify(cachedSessionStore);
}
@Test
public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
assertFalse(metered.setFlushListener(null, false));
assertFalse(store.setFlushListener(null, false));
}
@Test
public void shouldRemoveMetricsOnClose() {
inner.close();
innerStore.close();
expectLastCall();
init(); // replays "inner"
// There's always a "count" metric registered
assertThat(storeMetrics(), not(empty()));
metered.close();
store.close();
assertThat(storeMetrics(), empty());
verify(inner);
verify(innerStore);
}
@Test
public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
inner.close();
innerStore.close();
expectLastCall().andThrow(new RuntimeException("Oops!"));
init(); // replays "inner"
assertThat(storeMetrics(), not(empty()));
assertThrows(RuntimeException.class, metered::close);
assertThrows(RuntimeException.class, store::close);
assertThat(storeMetrics(), empty());
verify(inner);
verify(innerStore);
}
private KafkaMetric metric(final String name) {

128
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java

@ -23,8 +23,10 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -23,8 +23,10 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
@ -32,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig; @@ -32,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@ -64,6 +67,7 @@ import static org.easymock.EasyMock.eq; @@ -64,6 +67,7 @@ import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
@ -79,11 +83,21 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -79,11 +83,21 @@ public class MeteredTimestampedKeyValueStoreTest {
@Rule
public EasyMockRule rule = new EasyMockRule(this);
private static final String APPLICATION_ID = "test-app";
private static final String STORE_NAME = "store-name";
private static final String STORE_TYPE = "scope";
private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-" + STORE_TYPE + "-state-metrics";
private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
private static final String CHANGELOG_TOPIC = "changelog-topic-name";
private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id";
private static final String THREAD_ID_TAG_KEY = "thread-id";
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP =
ValueAndTimestamp.make("value", 97L);
// timestamp is 97 what is ASCII of 'a'
private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\0\0\0\0\0\0\0avalue".getBytes();
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0);
@ -93,12 +107,9 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -93,12 +107,9 @@ public class MeteredTimestampedKeyValueStoreTest {
private InternalProcessorContext context;
private MeteredTimestampedKeyValueStore<String, String> metered;
private final String key = "key";
private final Bytes keyBytes = Bytes.wrap(key.getBytes());
private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
// timestamp is 97 what is ASCII of 'a'
private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
private final KeyValue<Bytes, byte[]> byteKeyValueTimestampPair = KeyValue.pair(keyBytes, valueAndTimestampBytes);
private final KeyValue<Bytes, byte[]> byteKeyValueTimestampPair = KeyValue.pair(KEY_BYTES,
VALUE_AND_TIMESTAMP_BYTES
);
private final Metrics metrics = new Metrics();
private String storeLevelGroup;
private String threadIdTagKey;
@ -125,10 +136,12 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -125,10 +136,12 @@ public class MeteredTimestampedKeyValueStoreTest {
new ValueAndTimestampSerde<>(Serdes.String())
);
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.applicationId()).andStubReturn(APPLICATION_ID);
expect(context.metrics())
.andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes();
expect(context.taskId()).andReturn(taskId).anyTimes();
expect(inner.name()).andReturn("metered").anyTimes();
.andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion));
expect(context.taskId()).andStubReturn(taskId);
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
expect(inner.name()).andStubReturn(STORE_NAME);
storeLevelGroup =
StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
threadIdTagKey =
@ -136,7 +149,7 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -136,7 +149,7 @@ public class MeteredTimestampedKeyValueStoreTest {
tags = mkMap(
mkEntry(threadIdTagKey, threadId),
mkEntry("task-id", taskId.toString()),
mkEntry(STORE_TYPE + "-state-id", "metered")
mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
);
}
@ -146,6 +159,47 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -146,6 +159,47 @@ public class MeteredTimestampedKeyValueStoreTest {
metered.init(context, metered);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME);
expect(context.changelogFor(STORE_NAME)).andReturn(null);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) {
final Serde<String> keySerde = niceMock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<ValueAndTimestamp<String>> valueSerde = niceMock(Serde.class);
final Deserializer<ValueAndTimestamp<String>> valueDeserializer = mock(Deserializer.class);
final Serializer<ValueAndTimestamp<String>> valueSerializer = mock(Serializer.class);
expect(keySerde.serializer()).andStubReturn(keySerializer);
expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes());
expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
expect(valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP);
expect(valueSerde.serializer()).andStubReturn(valueSerializer);
expect(valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES);
expect(inner.get(KEY_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES);
replay(inner, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde);
metered = new MeteredTimestampedKeyValueStore<>(
inner,
STORE_TYPE,
new MockTime(),
keySerde,
valueSerde
);
metered.init(context, metered);
metered.get(KEY);
metered.put(KEY, VALUE_AND_TIMESTAMP);
verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
public void testMetrics() {
init();
@ -161,7 +215,7 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -161,7 +215,7 @@ public class MeteredTimestampedKeyValueStoreTest {
threadId,
taskId.toString(),
STORE_TYPE,
"metered"
STORE_NAME
)));
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
assertTrue(reporter.containsMbean(String.format(
@ -177,11 +231,11 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -177,11 +231,11 @@ public class MeteredTimestampedKeyValueStoreTest {
}
@Test
public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
inner.put(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES));
expectLastCall();
init();
metered.put(key, valueAndTimestamp);
metered.put(KEY, VALUE_AND_TIMESTAMP);
final KafkaMetric metric = metric("put-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -190,15 +244,15 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -190,15 +244,15 @@ public class MeteredTimestampedKeyValueStoreTest {
@Test
public void shouldGetWithBinary() {
expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes);
expect(inner.get(KEY_BYTES)).andReturn(VALUE_AND_TIMESTAMP_BYTES);
inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
inner.put(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES));
expectLastCall();
init();
final RawAndDeserializedValue<String> valueWithBinary = metered.getWithBinary(key);
assertEquals(valueWithBinary.value, valueAndTimestamp);
assertEquals(valueWithBinary.serializedValue, valueAndTimestampBytes);
final RawAndDeserializedValue<String> valueWithBinary = metered.getWithBinary(KEY);
assertEquals(valueWithBinary.value, VALUE_AND_TIMESTAMP);
assertEquals(valueWithBinary.serializedValue, VALUE_AND_TIMESTAMP_BYTES);
}
@SuppressWarnings("resource")
@ -206,40 +260,38 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -206,40 +260,38 @@ public class MeteredTimestampedKeyValueStoreTest {
public void shouldNotPutIfSameValuesAndGreaterTimestamp() {
init();
metered.put(key, valueAndTimestamp);
metered.put(KEY, VALUE_AND_TIMESTAMP);
final ValueAndTimestampSerde<String> stringSerde = new ValueAndTimestampSerde<>(Serdes.String());
final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp);
final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", VALUE_AND_TIMESTAMP);
final ValueAndTimestamp<String> newValueAndTimestamp = ValueAndTimestamp.make("value", 98L);
assertFalse(metered.putIfDifferentValues(key,
newValueAndTimestamp,
encodedOldValue));
assertFalse(metered.putIfDifferentValues(KEY, newValueAndTimestamp, encodedOldValue));
verify(inner);
}
@SuppressWarnings("resource")
@Test
public void shouldPutIfOutOfOrder() {
inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
inner.put(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES));
expectLastCall();
init();
metered.put(key, valueAndTimestamp);
metered.put(KEY, VALUE_AND_TIMESTAMP);
final ValueAndTimestampSerde<String> stringSerde = new ValueAndTimestampSerde<>(Serdes.String());
final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp);
final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", VALUE_AND_TIMESTAMP);
final ValueAndTimestamp<String> outOfOrderValueAndTimestamp = ValueAndTimestamp.make("value", 95L);
assertTrue(metered.putIfDifferentValues(key, outOfOrderValueAndTimestamp, encodedOldValue));
assertTrue(metered.putIfDifferentValues(KEY, outOfOrderValueAndTimestamp, encodedOldValue));
verify(inner);
}
@Test
public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes);
expect(inner.get(KEY_BYTES)).andReturn(VALUE_AND_TIMESTAMP_BYTES);
init();
assertThat(metered.get(key), equalTo(valueAndTimestamp));
assertThat(metered.get(KEY), equalTo(VALUE_AND_TIMESTAMP));
final KafkaMetric metric = metric("get-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -248,10 +300,10 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -248,10 +300,10 @@ public class MeteredTimestampedKeyValueStoreTest {
@Test
public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
expect(inner.putIfAbsent(eq(keyBytes), aryEq(valueAndTimestampBytes))).andReturn(null);
expect(inner.putIfAbsent(eq(KEY_BYTES), aryEq(VALUE_AND_TIMESTAMP_BYTES))).andReturn(null);
init();
metered.putIfAbsent(key, valueAndTimestamp);
metered.putIfAbsent(KEY, VALUE_AND_TIMESTAMP);
final KafkaMetric metric = metric("put-if-absent-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -269,7 +321,7 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -269,7 +321,7 @@ public class MeteredTimestampedKeyValueStoreTest {
expectLastCall();
init();
metered.putAll(Collections.singletonList(KeyValue.pair(key, valueAndTimestamp)));
metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE_AND_TIMESTAMP)));
final KafkaMetric metric = metric("put-all-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -278,10 +330,10 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -278,10 +330,10 @@ public class MeteredTimestampedKeyValueStoreTest {
@Test
public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
expect(inner.delete(keyBytes)).andReturn(valueAndTimestampBytes);
expect(inner.delete(KEY_BYTES)).andReturn(VALUE_AND_TIMESTAMP_BYTES);
init();
metered.delete(key);
metered.delete(KEY);
final KafkaMetric metric = metric("delete-rate");
assertTrue((Double) metric.metricValue() > 0);
@ -290,12 +342,12 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -290,12 +342,12 @@ public class MeteredTimestampedKeyValueStoreTest {
@Test
public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
expect(inner.range(keyBytes, keyBytes)).andReturn(
expect(inner.range(KEY_BYTES, KEY_BYTES)).andReturn(
new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampPair).iterator()));
init();
final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.range(key, key);
assertThat(iterator.next().value, equalTo(valueAndTimestamp));
final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.range(KEY, KEY);
assertThat(iterator.next().value, equalTo(VALUE_AND_TIMESTAMP));
assertFalse(iterator.hasNext());
iterator.close();
@ -311,7 +363,7 @@ public class MeteredTimestampedKeyValueStoreTest { @@ -311,7 +363,7 @@ public class MeteredTimestampedKeyValueStoreTest {
init();
final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.all();
assertThat(iterator.next().value, equalTo(valueAndTimestamp));
assertThat(iterator.next().value, equalTo(VALUE_AND_TIMESTAMP));
assertFalse(iterator.hasNext());
iterator.close();

76
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java

@ -19,12 +19,16 @@ package org.apache.kafka.streams.state.internals; @@ -19,12 +19,16 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
@ -36,25 +40,42 @@ import org.easymock.EasyMock; @@ -36,25 +40,42 @@ import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class MeteredTimestampedWindowStoreTest {
private static final String STORE_NAME = "mocked-store";
private static final String STORE_TYPE = "scope";
private static final String CHANGELOG_TOPIC = "changelog-topic";
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
// timestamp is 97 what is ASCII of 'a'
private static final long TIMESTAMP = 97L;
private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP =
ValueAndTimestamp.make("value", TIMESTAMP);
private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\0\0\0\0\0\0\0avalue".getBytes();
private static final int WINDOW_SIZE_MS = 10;
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
innerStoreMock,
10L, // any size
"scope",
WINDOW_SIZE_MS, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
);
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
{
EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes();
EasyMock.expect(innerStoreMock.name()).andStubReturn(STORE_NAME);
}
@Before
@ -73,6 +94,49 @@ public class MeteredTimestampedWindowStoreTest { @@ -73,6 +94,49 @@ public class MeteredTimestampedWindowStoreTest {
);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName =
ProcessorStateManager.storeChangelogTopic(context.applicationId(), STORE_NAME);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) {
final Serde<String> keySerde = niceMock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<ValueAndTimestamp<String>> valueSerde = niceMock(Serde.class);
final Deserializer<ValueAndTimestamp<String>> valueDeserializer = mock(Deserializer.class);
final Serializer<ValueAndTimestamp<String>> valueSerializer = mock(Serializer.class);
expect(keySerde.serializer()).andStubReturn(keySerializer);
expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes());
expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
expect(valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP);
expect(valueSerde.serializer()).andStubReturn(valueSerializer);
expect(valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES);
expect(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).andStubReturn(VALUE_AND_TIMESTAMP_BYTES);
replay(innerStoreMock, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde);
store = new MeteredTimestampedWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS,
STORE_TYPE,
new MockTime(),
keySerde,
valueSerde
);
store.init(context, store);
store.fetch(KEY, TIMESTAMP);
store.put(KEY, VALUE_AND_TIMESTAMP, TIMESTAMP);
verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
public void shouldCloseUnderlyingStore() {
innerStoreMock.close();

64
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java

@ -24,12 +24,16 @@ import org.apache.kafka.common.metrics.MetricConfig; @@ -24,12 +24,16 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
@ -61,6 +65,7 @@ import static org.easymock.EasyMock.eq; @@ -61,6 +65,7 @@ import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
@ -81,14 +86,20 @@ public class MeteredWindowStoreTest { @@ -81,14 +86,20 @@ public class MeteredWindowStoreTest {
private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id";
private static final String THREAD_ID_TAG_KEY = "thread-id";
private static final String STORE_NAME = "mocked-store";
private static final String CHANGELOG_TOPIC = "changelog-topic";
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final int WINDOW_SIZE_MS = 10;
private static final long TIMESTAMP = 42L;
private final String threadId = Thread.currentThread().getName();
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
innerStoreMock,
10L, // any size
WINDOW_SIZE_MS, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
@ -116,9 +127,7 @@ public class MeteredWindowStoreTest { @@ -116,9 +127,7 @@ public class MeteredWindowStoreTest {
@Before
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion);
context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),
Serdes.String(),
@ -139,6 +148,49 @@ public class MeteredWindowStoreTest { @@ -139,6 +148,49 @@ public class MeteredWindowStoreTest {
);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName =
ProcessorStateManager.storeChangelogTopic(context.applicationId(), STORE_NAME);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) {
final Serde<String> keySerde = niceMock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<String> valueSerde = niceMock(Serde.class);
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
expect(keySerde.serializer()).andStubReturn(keySerializer);
expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes());
expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE);
expect(valueSerde.serializer()).andStubReturn(valueSerializer);
expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES);
expect(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).andStubReturn(VALUE_BYTES);
replay(innerStoreMock, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde);
store = new MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS,
STORE_TYPE,
new MockTime(),
keySerde,
valueSerde
);
store.init(context, store);
store.fetch(KEY, TIMESTAMP);
store.put(KEY, VALUE, TIMESTAMP);
verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
public void testMetrics() {
replay(innerStoreMock);

5
streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java

@ -88,4 +88,9 @@ public class GlobalStateManagerStub implements GlobalStateManager { @@ -88,4 +88,9 @@ public class GlobalStateManagerStub implements GlobalStateManager {
public TaskType taskType() {
return TaskType.GLOBAL;
}
@Override
public String changelogFor(final String storeName) {
return null;
}
}

18
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java

@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; @@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateManagerStub;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.ToInternal;
@ -70,6 +72,7 @@ public class InternalMockProcessorContext @@ -70,6 +72,7 @@ public class InternalMockProcessorContext
private Serde<?> keySerde;
private Serde<?> valueSerde;
private long timestamp = -1L;
private final Map<String, String> storeToChangelogTopic = new HashMap<>();
public InternalMockProcessorContext() {
this(null,
@ -182,7 +185,6 @@ public class InternalMockProcessorContext @@ -182,7 +185,6 @@ public class InternalMockProcessorContext
new TaskId(0, 0),
config,
metrics,
null,
cache
);
super.setCurrentNode(new ProcessorNode<>("TESTING_NODE"));
@ -193,6 +195,11 @@ public class InternalMockProcessorContext @@ -193,6 +195,11 @@ public class InternalMockProcessorContext
this.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger(new SystemTime()));
}
@Override
protected StateManager stateManager() {
return new StateManagerStub();
}
@Override
public RecordCollector recordCollector() {
final RecordCollector recordCollector = recordCollectorSupplier.recordCollector();
@ -400,4 +407,13 @@ public class InternalMockProcessorContext @@ -400,4 +407,13 @@ public class InternalMockProcessorContext
}
restoreCallback.restoreBatch(records);
}
public void addChangelogForStore(final String storeName, final String changelogTopic) {
storeToChangelogTopic.put(storeName, changelogTopic);
}
@Override
public String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
}

5
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java

@ -145,4 +145,9 @@ public class MockInternalProcessorContext extends MockProcessorContext implement @@ -145,4 +145,9 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
@Override
public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) {
}
@Override
public String changelogFor(final String storeName) {
return "mock-changelog";
}
}

16
streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java

@ -33,7 +33,11 @@ import java.time.Duration; @@ -33,7 +33,11 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateManagerStub;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.ThreadCache;
@ -45,7 +49,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext { @@ -45,7 +49,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
public Map<Object, Object> forwardedValues = new HashMap<>();
public NoOpProcessorContext() {
super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null);
super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null);
}
private static StreamsConfig streamsConfig() {
@ -55,6 +59,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext { @@ -55,6 +59,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
return new StreamsConfig(props);
}
@Override
protected StateManager stateManager() {
return new StateManagerStub();
}
@Override
public StateStore getStateStore(final String name) {
return null;
@ -134,4 +143,9 @@ public class NoOpProcessorContext extends AbstractProcessorContext { @@ -134,4 +143,9 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) {
cache.addDirtyEntryFlushListener(namespace, listener);
}
@Override
public String changelogFor(final String storeName) {
return ProcessorStateManager.storeChangelogTopic(applicationId(), storeName);
}
}

Loading…
Cancel
Save