From c526c0c3f6f153ef8f84a3e66d9f997d8c31e20c Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 27 Sep 2016 17:43:36 -0700 Subject: [PATCH] MINOR: add test to make sure ProcessorStateManager can handle State Stores with logging disabled Adding the test so we know that the State Stores with logging disabled or without a topic don't throw any exceptions. Author: Damian Guy Reviewers: Guozhang Wang Closes #1916 from dguy/state-store-logging-disabled --- .../processor/internals/ProcessorStateManagerTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 5802b29113f..7c2220227ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -461,4 +462,12 @@ public class ProcessorStateManagerTest { assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); } + @Test + public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { + MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.emptyMap()); + stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); + assertNotNull(stateMgr.getStore(nonPersistentStoreName)); + } + }