Browse Source

MINOR: add test to make sure ProcessorStateManager can handle State Stores with logging disabled

Adding the test so we know that the State Stores with logging disabled or without a topic don't throw any exceptions.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1916 from dguy/state-store-logging-disabled
pull/1925/head
Damian Guy 8 years ago committed by Guozhang Wang
parent
commit
c526c0c3f6
  1. 9
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

9
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

@ -48,6 +48,7 @@ import java.util.Map; @@ -48,6 +48,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@ -461,4 +462,12 @@ public class ProcessorStateManagerTest { @@ -461,4 +462,12 @@ public class ProcessorStateManagerTest {
assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
}
@Test
public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
assertNotNull(stateMgr.getStore(nonPersistentStoreName));
}
}

Loading…
Cancel
Save