Browse Source

KAFKA-14133: Move RocksDBRangeIteratorTest, TimestampedKeyValueStoreBuilderTest and TimestampedSegmentTest to Mockito (#14222)

Reviewers: Divij Vaidya <diviv@amazon.com>
pull/14316/head
Christo Lolov 1 year ago committed by GitHub
parent
commit
dbda60c60d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 281
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
  2. 53
      streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
  3. 13
      streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java

281
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java

@ -19,19 +19,22 @@ package org.apache.kafka.streams.state.internals; @@ -19,19 +19,22 @@ package org.apache.kafka.streams.state.internals;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.Bytes;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.rocksdb.RocksIterator;
import java.util.NoSuchElementException;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class RocksDBRangeIteratorTest {
private final String storeName = "store";
@ -50,20 +53,18 @@ public class RocksDBRangeIteratorTest { @@ -50,20 +53,18 @@ public class RocksDBRangeIteratorTest {
@Test
public void shouldReturnAllKeysInTheRangeInForwardDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key1Bytes.get())
.andReturn(key2Bytes.get())
.andReturn(key3Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(3);
rocksIterator.next();
expectLastCall().times(3);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key1Bytes.get())
.thenReturn(key2Bytes.get())
.thenReturn(key3Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).next();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -79,26 +80,25 @@ public class RocksDBRangeIteratorTest { @@ -79,26 +80,25 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key3Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(3)).value();
verify(rocksIterator, times(3)).next();
}
@Test
public void shouldReturnAllKeysInTheRangeReverseDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seekForPrev(key3Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key3Bytes.get())
.andReturn(key2Bytes.get())
.andReturn(key1Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(3);
rocksIterator.prev();
expectLastCall().times(3);
replay(rocksIterator);
doNothing().when(rocksIterator).seekForPrev(key3Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key3Bytes.get())
.thenReturn(key2Bytes.get())
.thenReturn(key1Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).prev();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -114,29 +114,28 @@ public class RocksDBRangeIteratorTest { @@ -114,29 +114,28 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key1Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(3)).value();
verify(rocksIterator, times(3)).prev();
}
@Test
public void shouldReturnAllKeysWhenLastKeyIsGreaterThanLargestKeyInStateStoreInForwardDirection() {
final Bytes toBytes = Bytes.increment(key4Bytes);
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key1Bytes.get())
.andReturn(key2Bytes.get())
.andReturn(key3Bytes.get())
.andReturn(key4Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(4);
rocksIterator.next();
expectLastCall().times(4);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key1Bytes.get())
.thenReturn(key2Bytes.get())
.thenReturn(key3Bytes.get())
.thenReturn(key4Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).next();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -154,29 +153,28 @@ public class RocksDBRangeIteratorTest { @@ -154,29 +153,28 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key4Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(4)).value();
verify(rocksIterator, times(4)).next();
}
@Test
public void shouldReturnAllKeysWhenLastKeyIsSmallerThanSmallestKeyInStateStoreInReverseDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seekForPrev(key4Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key4Bytes.get())
.andReturn(key3Bytes.get())
.andReturn(key2Bytes.get())
.andReturn(key1Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(4);
rocksIterator.prev();
expectLastCall().times(4);
replay(rocksIterator);
doNothing().when(rocksIterator).seekForPrev(key4Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key4Bytes.get())
.thenReturn(key3Bytes.get())
.thenReturn(key2Bytes.get())
.thenReturn(key1Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).prev();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -194,7 +192,8 @@ public class RocksDBRangeIteratorTest { @@ -194,7 +192,8 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key1Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(4)).value();
verify(rocksIterator, times(4)).prev();
}
@ -202,9 +201,8 @@ public class RocksDBRangeIteratorTest { @@ -202,9 +201,8 @@ public class RocksDBRangeIteratorTest {
public void shouldReturnNoKeysWhenLastKeyIsSmallerThanSmallestKeyInStateStoreForwardDirection() {
// key range in state store: [c-f]
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
expect(rocksIterator.isValid()).andReturn(false);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid()).thenReturn(false);
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -214,7 +212,6 @@ public class RocksDBRangeIteratorTest { @@ -214,7 +212,6 @@ public class RocksDBRangeIteratorTest {
true
);
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
}
@Test
@ -225,10 +222,9 @@ public class RocksDBRangeIteratorTest { @@ -225,10 +222,9 @@ public class RocksDBRangeIteratorTest {
final Bytes fromBytes = Bytes.wrap(from.getBytes());
final Bytes toBytes = Bytes.wrap(to.getBytes());
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seekForPrev(toBytes.get());
expect(rocksIterator.isValid())
.andReturn(false);
replay(rocksIterator);
doNothing().when(rocksIterator).seekForPrev(toBytes.get());
when(rocksIterator.isValid())
.thenReturn(false);
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -238,24 +234,21 @@ public class RocksDBRangeIteratorTest { @@ -238,24 +234,21 @@ public class RocksDBRangeIteratorTest {
true
);
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
}
@Test
public void shouldReturnAllKeysInPartiallyOverlappingRangeInForwardDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key2Bytes.get())
.andReturn(key3Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(2);
rocksIterator.next();
expectLastCall().times(2);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key2Bytes.get())
.thenReturn(key3Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).next();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -269,7 +262,8 @@ public class RocksDBRangeIteratorTest { @@ -269,7 +262,8 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key3Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(2)).value();
verify(rocksIterator, times(2)).next();
}
@Test
@ -277,18 +271,16 @@ public class RocksDBRangeIteratorTest { @@ -277,18 +271,16 @@ public class RocksDBRangeIteratorTest {
final RocksIterator rocksIterator = mock(RocksIterator.class);
final String to = "e";
final Bytes toBytes = Bytes.wrap(to.getBytes());
rocksIterator.seekForPrev(toBytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key4Bytes.get())
.andReturn(key3Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(2);
rocksIterator.prev();
expectLastCall().times(2);
replay(rocksIterator);
doNothing().when(rocksIterator).seekForPrev(toBytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key4Bytes.get())
.thenReturn(key3Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).prev();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -302,24 +294,23 @@ public class RocksDBRangeIteratorTest { @@ -302,24 +294,23 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key3Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(2)).value();
verify(rocksIterator, times(2)).prev();
}
@Test
public void shouldReturnTheCurrentKeyOnInvokingPeekNextKeyInForwardDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key2Bytes.get())
.andReturn(key3Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(2);
rocksIterator.next();
expectLastCall().times(2);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key2Bytes.get())
.thenReturn(key3Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).next();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -338,25 +329,24 @@ public class RocksDBRangeIteratorTest { @@ -338,25 +329,24 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.next().key, is(key3Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
assertThrows(NoSuchElementException.class, rocksDBRangeIterator::peekNextKey);
verify(rocksIterator);
verify(rocksIterator, times(2)).value();
verify(rocksIterator, times(2)).next();
}
@Test
public void shouldReturnTheCurrentKeyOnInvokingPeekNextKeyInReverseDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
final Bytes toBytes = Bytes.increment(key4Bytes);
rocksIterator.seekForPrev(toBytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true)
.andReturn(false);
expect(rocksIterator.key())
.andReturn(key4Bytes.get())
.andReturn(key3Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(2);
rocksIterator.prev();
expectLastCall().times(2);
replay(rocksIterator);
doNothing().when(rocksIterator).seekForPrev(toBytes.get());
when(rocksIterator.isValid())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(rocksIterator.key())
.thenReturn(key4Bytes.get())
.thenReturn(key3Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).prev();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -375,16 +365,15 @@ public class RocksDBRangeIteratorTest { @@ -375,16 +365,15 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.next().key, is(key3Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
assertThrows(NoSuchElementException.class, rocksDBRangeIterator::peekNextKey);
verify(rocksIterator);
verify(rocksIterator, times(2)).value();
verify(rocksIterator, times(2)).prev();
}
@Test
public void shouldCloseIterator() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
rocksIterator.close();
expectLastCall().times(1);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
doNothing().when(rocksIterator).close();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -395,7 +384,7 @@ public class RocksDBRangeIteratorTest { @@ -395,7 +384,7 @@ public class RocksDBRangeIteratorTest {
);
rocksDBRangeIterator.onClose(() -> { });
rocksDBRangeIterator.close();
verify(rocksIterator);
verify(rocksIterator).close();
}
@Test
@ -418,17 +407,14 @@ public class RocksDBRangeIteratorTest { @@ -418,17 +407,14 @@ public class RocksDBRangeIteratorTest {
@Test
public void shouldExcludeEndOfRange() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
rocksIterator.seek(key1Bytes.get());
expect(rocksIterator.isValid())
.andReturn(true)
.andReturn(true);
expect(rocksIterator.key())
.andReturn(key1Bytes.get())
.andReturn(key2Bytes.get());
expect(rocksIterator.value()).andReturn(valueBytes).times(2);
rocksIterator.next();
expectLastCall().times(2);
replay(rocksIterator);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid())
.thenReturn(true);
when(rocksIterator.key())
.thenReturn(key1Bytes.get())
.thenReturn(key2Bytes.get());
when(rocksIterator.value()).thenReturn(valueBytes);
doNothing().when(rocksIterator).next();
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(
storeName,
rocksIterator,
@ -440,7 +426,8 @@ public class RocksDBRangeIteratorTest { @@ -440,7 +426,8 @@ public class RocksDBRangeIteratorTest {
assertThat(rocksDBRangeIterator.hasNext(), is(true));
assertThat(rocksDBRangeIterator.next().key, is(key1Bytes));
assertThat(rocksDBRangeIterator.hasNext(), is(false));
verify(rocksIterator);
verify(rocksIterator, times(2)).value();
verify(rocksIterator, times(2)).next();
}
}

53
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java

@ -22,40 +22,35 @@ import org.apache.kafka.common.utils.MockTime; @@ -22,40 +22,35 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
@RunWith(EasyMockRunner.class)
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TimestampedKeyValueStoreBuilderTest {
@Mock(type = MockType.NICE)
@Mock
private KeyValueBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
@Mock
private RocksDBTimestampedStore inner;
private TimestampedKeyValueStoreBuilder<String, String> builder;
@Before
public void setUp() {
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
expect(supplier.metricsScope()).andReturn("metricScope");
expect(inner.persistent()).andReturn(true).anyTimes();
replay(supplier, inner);
when(supplier.get()).thenReturn(inner);
when(supplier.name()).thenReturn("name");
when(supplier.metricsScope()).thenReturn("metricScope");
builder = new TimestampedKeyValueStoreBuilder<>(
supplier,
@ -121,10 +116,7 @@ public class TimestampedKeyValueStoreBuilderTest { @@ -121,10 +116,7 @@ public class TimestampedKeyValueStoreBuilderTest {
@Test
public void shouldNotWrapTimestampedByteStore() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name", "metrics-scope"));
expect(supplier.name()).andReturn("name");
replay(supplier);
when(supplier.get()).thenReturn(new RocksDBTimestampedStore("name", "metrics-scope"));
final TimestampedKeyValueStore<String, String> store = builder
.withLoggingDisabled()
@ -135,10 +127,7 @@ public class TimestampedKeyValueStoreBuilderTest { @@ -135,10 +127,7 @@ public class TimestampedKeyValueStoreBuilderTest {
@Test
public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBStore("name", "metrics-scope"));
expect(supplier.name()).andReturn("name");
replay(supplier);
when(supplier.get()).thenReturn(new RocksDBStore("name", "metrics-scope"));
final TimestampedKeyValueStore<String, String> store = builder
.withLoggingDisabled()
@ -155,42 +144,24 @@ public class TimestampedKeyValueStoreBuilderTest { @@ -155,42 +144,24 @@ public class TimestampedKeyValueStoreBuilderTest {
@Test
public void shouldNotThrowNullPointerIfKeySerdeIsNull() {
reset(supplier);
expect(supplier.name()).andReturn("name");
expect(supplier.metricsScope()).andReturn("metricScope").anyTimes();
replay(supplier);
// does not throw
new TimestampedKeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
}
@Test
public void shouldNotThrowNullPointerIfValueSerdeIsNull() {
reset(supplier);
expect(supplier.name()).andReturn("name");
expect(supplier.metricsScope()).andReturn("metricScope").anyTimes();
replay(supplier);
// does not throw
new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
}
@Test
public void shouldThrowNullPointerIfTimeIsNull() {
reset(supplier);
expect(supplier.name()).andReturn("name");
expect(supplier.metricsScope()).andReturn("metricScope").anyTimes();
replay(supplier);
assertThrows(NullPointerException.class, () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null));
}
@Test
public void shouldThrowNullPointerIfMetricsScopeIsNull() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name", null));
expect(supplier.name()).andReturn("name");
replay(supplier);
when(supplier.metricsScope()).thenReturn(null);
final Exception e = assertThrows(NullPointerException.class,
() -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()));

13
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java

@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.File;
import java.util.HashSet;
@ -34,15 +36,15 @@ import java.util.Set; @@ -34,15 +36,15 @@ import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TimestampedSegmentTest {
private final RocksDBMetricsRecorder metricsRecorder =
@ -63,9 +65,8 @@ public class TimestampedSegmentTest { @@ -63,9 +65,8 @@ public class TimestampedSegmentTest {
final File directory = new File(directoryPath);
final ProcessorContext mockContext = mock(ProcessorContext.class);
expect(mockContext.appConfigs()).andReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")));
expect(mockContext.stateDir()).andReturn(directory);
replay(mockContext);
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")));
when(mockContext.stateDir()).thenReturn(directory);
segment.openDB(mockContext.appConfigs(), mockContext.stateDir());

Loading…
Cancel
Save