|
|
|
@ -22,13 +22,14 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -22,13 +22,14 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
|
|
|
|
import org.apache.kafka.streams.state.ValueAndTimestamp; |
|
|
|
|
import org.apache.kafka.streams.state.internals.WrappedStateStore; |
|
|
|
|
import org.junit.Test; |
|
|
|
|
import org.junit.runner.RunWith; |
|
|
|
|
import org.mockito.junit.MockitoJUnitRunner; |
|
|
|
|
|
|
|
|
|
import static org.easymock.EasyMock.expect; |
|
|
|
|
import static org.easymock.EasyMock.expectLastCall; |
|
|
|
|
import static org.easymock.EasyMock.mock; |
|
|
|
|
import static org.easymock.EasyMock.replay; |
|
|
|
|
import static org.easymock.EasyMock.verify; |
|
|
|
|
import static org.mockito.Mockito.doNothing; |
|
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
|
import static org.mockito.Mockito.when; |
|
|
|
|
|
|
|
|
|
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
|
|
|
|
public class TimestampedTupleForwarderTest { |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -38,11 +39,12 @@ public class TimestampedTupleForwarderTest {
@@ -38,11 +39,12 @@ public class TimestampedTupleForwarderTest {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void setFlushListener(final boolean sendOldValues) { |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class); |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class); |
|
|
|
|
|
|
|
|
|
expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); |
|
|
|
|
replay(store); |
|
|
|
|
when(store.setFlushListener(flushListener, sendOldValues)).thenReturn(false); |
|
|
|
|
|
|
|
|
|
new TimestampedTupleForwarder<>( |
|
|
|
|
store, |
|
|
|
@ -50,8 +52,6 @@ public class TimestampedTupleForwarderTest {
@@ -50,8 +52,6 @@ public class TimestampedTupleForwarderTest {
|
|
|
|
|
flushListener, |
|
|
|
|
sendOldValues |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
verify(store); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -61,19 +61,19 @@ public class TimestampedTupleForwarderTest {
@@ -61,19 +61,19 @@ public class TimestampedTupleForwarderTest {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) { |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class); |
|
|
|
|
|
|
|
|
|
expect(store.setFlushListener(null, sendOldValues)).andReturn(false); |
|
|
|
|
when(store.setFlushListener(null, sendOldValues)).thenReturn(false); |
|
|
|
|
if (sendOldValues) { |
|
|
|
|
context.forward(new Record<>("key1", new Change<>("newValue1", "oldValue1", true), 0L)); |
|
|
|
|
context.forward(new Record<>("key2", new Change<>("newValue2", "oldValue2", false), 42L)); |
|
|
|
|
doNothing().when(context).forward(new Record<>("key1", new Change<>("newValue1", "oldValue1", true), 0L)); |
|
|
|
|
doNothing().when(context).forward(new Record<>("key2", new Change<>("newValue2", "oldValue2", false), 42L)); |
|
|
|
|
} else { |
|
|
|
|
context.forward(new Record<>("key1", new Change<>("newValue1", null, true), 0L)); |
|
|
|
|
context.forward(new Record<>("key2", new Change<>("newValue2", null, false), 42L)); |
|
|
|
|
doNothing().when(context).forward(new Record<>("key1", new Change<>("newValue1", null, true), 0L)); |
|
|
|
|
doNothing().when(context).forward(new Record<>("key2", new Change<>("newValue2", null, false), 42L)); |
|
|
|
|
} |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(store, context); |
|
|
|
|
|
|
|
|
|
final TimestampedTupleForwarder<String, String> forwarder = |
|
|
|
|
new TimestampedTupleForwarder<>( |
|
|
|
@ -84,17 +84,16 @@ public class TimestampedTupleForwarderTest {
@@ -84,17 +84,16 @@ public class TimestampedTupleForwarderTest {
|
|
|
|
|
); |
|
|
|
|
forwarder.maybeForward(new Record<>("key1", new Change<>("newValue1", "oldValue1", true), 0L)); |
|
|
|
|
forwarder.maybeForward(new Record<>("key2", new Change<>("newValue2", "oldValue2", false), 42L)); |
|
|
|
|
|
|
|
|
|
verify(store, context); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() { |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class); |
|
|
|
|
|
|
|
|
|
expect(store.setFlushListener(null, false)).andReturn(true); |
|
|
|
|
replay(store, context); |
|
|
|
|
when(store.setFlushListener(null, false)).thenReturn(true); |
|
|
|
|
|
|
|
|
|
final TimestampedTupleForwarder<String, String> forwarder = |
|
|
|
|
new TimestampedTupleForwarder<>( |
|
|
|
@ -105,7 +104,5 @@ public class TimestampedTupleForwarderTest {
@@ -105,7 +104,5 @@ public class TimestampedTupleForwarderTest {
|
|
|
|
|
); |
|
|
|
|
forwarder.maybeForward(new Record<>("key", new Change<>("newValue", "oldValue", true), 0L)); |
|
|
|
|
forwarder.maybeForward(new Record<>("key", new Change<>("newValue", "oldValue", true), 42L)); |
|
|
|
|
|
|
|
|
|
verify(store, context); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|