|
|
|
@ -17,28 +17,28 @@
@@ -17,28 +17,28 @@
|
|
|
|
|
package org.apache.kafka.streams; |
|
|
|
|
|
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord; |
|
|
|
|
import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade; |
|
|
|
|
import org.apache.kafka.streams.processor.ProcessorContext; |
|
|
|
|
import org.apache.kafka.streams.processor.StateStore; |
|
|
|
|
import org.apache.kafka.streams.processor.StateStoreContext; |
|
|
|
|
import org.apache.kafka.streams.state.TimestampedKeyValueStore; |
|
|
|
|
import org.apache.kafka.streams.state.ValueAndTimestamp; |
|
|
|
|
import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade; |
|
|
|
|
import org.easymock.EasyMock; |
|
|
|
|
import org.junit.jupiter.api.BeforeEach; |
|
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
|
|
|
|
|
import static java.util.Arrays.asList; |
|
|
|
|
import static org.easymock.EasyMock.expect; |
|
|
|
|
import static org.easymock.EasyMock.expectLastCall; |
|
|
|
|
import static org.easymock.EasyMock.mock; |
|
|
|
|
import static org.easymock.EasyMock.replay; |
|
|
|
|
import static org.easymock.EasyMock.verify; |
|
|
|
|
import static org.hamcrest.CoreMatchers.is; |
|
|
|
|
import static org.hamcrest.MatcherAssert.assertThat; |
|
|
|
|
import static org.junit.jupiter.api.Assertions.assertNull; |
|
|
|
|
import static org.mockito.Mockito.doReturn; |
|
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
|
import static org.mockito.Mockito.times; |
|
|
|
|
import static org.mockito.Mockito.verify; |
|
|
|
|
import static org.mockito.Mockito.when; |
|
|
|
|
|
|
|
|
|
public class KeyValueStoreFacadeTest { |
|
|
|
|
private final TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore = EasyMock.mock(TimestampedKeyValueStore.class); |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
private final TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore = mock(TimestampedKeyValueStore.class); |
|
|
|
|
|
|
|
|
|
private KeyValueStoreFacade<String, String> keyValueStoreFacade; |
|
|
|
|
|
|
|
|
@ -52,124 +52,98 @@ public class KeyValueStoreFacadeTest {
@@ -52,124 +52,98 @@ public class KeyValueStoreFacadeTest {
|
|
|
|
|
public void shouldForwardDeprecatedInit() { |
|
|
|
|
final ProcessorContext context = mock(ProcessorContext.class); |
|
|
|
|
final StateStore store = mock(StateStore.class); |
|
|
|
|
mockedKeyValueTimestampStore.init(context, store); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
|
|
|
|
|
keyValueStoreFacade.init(context, store); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore).init(context, store); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldForwardInit() { |
|
|
|
|
final StateStoreContext context = mock(StateStoreContext.class); |
|
|
|
|
final StateStore store = mock(StateStore.class); |
|
|
|
|
mockedKeyValueTimestampStore.init(context, store); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
|
|
|
|
|
keyValueStoreFacade.init(context, store); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore).init(context, store); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldPutWithUnknownTimestamp() { |
|
|
|
|
mockedKeyValueTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
|
|
|
|
|
keyValueStoreFacade.put("key", "value"); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore) |
|
|
|
|
.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldPutIfAbsentWithUnknownTimestamp() { |
|
|
|
|
expect(mockedKeyValueTimestampStore.putIfAbsent("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP))) |
|
|
|
|
.andReturn(null) |
|
|
|
|
.andReturn(ValueAndTimestamp.make("oldValue", 42L)); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
doReturn(null, ValueAndTimestamp.make("oldValue", 42L)) |
|
|
|
|
.when(mockedKeyValueTimestampStore) |
|
|
|
|
.putIfAbsent("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
|
|
|
|
|
assertNull(keyValueStoreFacade.putIfAbsent("key", "value")); |
|
|
|
|
assertThat(keyValueStoreFacade.putIfAbsent("key", "value"), is("oldValue")); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore, times(2)) |
|
|
|
|
.putIfAbsent("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldPutAllWithUnknownTimestamp() { |
|
|
|
|
mockedKeyValueTimestampStore.put("key1", ValueAndTimestamp.make("value1", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
mockedKeyValueTimestampStore.put("key2", ValueAndTimestamp.make("value2", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
|
|
|
|
|
keyValueStoreFacade.putAll(asList( |
|
|
|
|
KeyValue.pair("key1", "value1"), |
|
|
|
|
KeyValue.pair("key2", "value2") |
|
|
|
|
)); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore) |
|
|
|
|
.put("key1", ValueAndTimestamp.make("value1", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
verify(mockedKeyValueTimestampStore) |
|
|
|
|
.put("key2", ValueAndTimestamp.make("value2", ConsumerRecord.NO_TIMESTAMP)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldDeleteAndReturnPlainValue() { |
|
|
|
|
expect(mockedKeyValueTimestampStore.delete("key")) |
|
|
|
|
.andReturn(null) |
|
|
|
|
.andReturn(ValueAndTimestamp.make("oldValue", 42L)); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
doReturn(null, ValueAndTimestamp.make("oldValue", 42L)) |
|
|
|
|
.when(mockedKeyValueTimestampStore).delete("key"); |
|
|
|
|
|
|
|
|
|
assertNull(keyValueStoreFacade.delete("key")); |
|
|
|
|
assertThat(keyValueStoreFacade.delete("key"), is("oldValue")); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore, times(2)).delete("key"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldForwardFlush() { |
|
|
|
|
mockedKeyValueTimestampStore.flush(); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
|
|
|
|
|
keyValueStoreFacade.flush(); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore).flush(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldForwardClose() { |
|
|
|
|
mockedKeyValueTimestampStore.close(); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
|
|
|
|
|
keyValueStoreFacade.close(); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore).close(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldReturnName() { |
|
|
|
|
expect(mockedKeyValueTimestampStore.name()).andReturn("name"); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
when(mockedKeyValueTimestampStore.name()).thenReturn("name"); |
|
|
|
|
|
|
|
|
|
assertThat(keyValueStoreFacade.name(), is("name")); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore).name(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldReturnIsPersistent() { |
|
|
|
|
expect(mockedKeyValueTimestampStore.persistent()) |
|
|
|
|
.andReturn(true) |
|
|
|
|
.andReturn(false); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
when(mockedKeyValueTimestampStore.persistent()) |
|
|
|
|
.thenReturn(true, false); |
|
|
|
|
|
|
|
|
|
assertThat(keyValueStoreFacade.persistent(), is(true)); |
|
|
|
|
assertThat(keyValueStoreFacade.persistent(), is(false)); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore, times(2)).persistent(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldReturnIsOpen() { |
|
|
|
|
expect(mockedKeyValueTimestampStore.isOpen()) |
|
|
|
|
.andReturn(true) |
|
|
|
|
.andReturn(false); |
|
|
|
|
replay(mockedKeyValueTimestampStore); |
|
|
|
|
when(mockedKeyValueTimestampStore.isOpen()) |
|
|
|
|
.thenReturn(true, false); |
|
|
|
|
|
|
|
|
|
assertThat(keyValueStoreFacade.isOpen(), is(true)); |
|
|
|
|
assertThat(keyValueStoreFacade.isOpen(), is(false)); |
|
|
|
|
verify(mockedKeyValueTimestampStore); |
|
|
|
|
verify(mockedKeyValueTimestampStore, times(2)).isOpen(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|