|
|
|
@ -44,12 +44,11 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -44,12 +44,11 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
|
|
|
|
import org.apache.kafka.streams.state.KeyValueIterator; |
|
|
|
|
import org.apache.kafka.streams.state.SessionStore; |
|
|
|
|
import org.apache.kafka.test.KeyValueIteratorStub; |
|
|
|
|
import org.easymock.EasyMockRule; |
|
|
|
|
import org.easymock.Mock; |
|
|
|
|
import org.easymock.MockType; |
|
|
|
|
import org.junit.Before; |
|
|
|
|
import org.junit.Rule; |
|
|
|
|
import org.junit.Test; |
|
|
|
|
import org.junit.runner.RunWith; |
|
|
|
|
import org.mockito.Mock; |
|
|
|
|
import org.mockito.junit.MockitoJUnitRunner; |
|
|
|
|
|
|
|
|
|
import java.util.Collections; |
|
|
|
|
import java.util.List; |
|
|
|
@ -58,15 +57,6 @@ import java.util.stream.Collectors;
@@ -58,15 +57,6 @@ import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
import static org.apache.kafka.common.utils.Utils.mkEntry; |
|
|
|
|
import static org.apache.kafka.common.utils.Utils.mkMap; |
|
|
|
|
import static org.easymock.EasyMock.anyObject; |
|
|
|
|
import static org.easymock.EasyMock.aryEq; |
|
|
|
|
import static org.easymock.EasyMock.eq; |
|
|
|
|
import static org.easymock.EasyMock.expect; |
|
|
|
|
import static org.easymock.EasyMock.expectLastCall; |
|
|
|
|
import static org.easymock.EasyMock.mock; |
|
|
|
|
import static org.easymock.EasyMock.niceMock; |
|
|
|
|
import static org.easymock.EasyMock.replay; |
|
|
|
|
import static org.easymock.EasyMock.verify; |
|
|
|
|
import static org.hamcrest.CoreMatchers.equalTo; |
|
|
|
|
import static org.hamcrest.MatcherAssert.assertThat; |
|
|
|
|
import static org.hamcrest.Matchers.empty; |
|
|
|
@ -75,12 +65,16 @@ import static org.junit.Assert.assertFalse;
@@ -75,12 +65,16 @@ import static org.junit.Assert.assertFalse;
|
|
|
|
|
import static org.junit.Assert.assertNull; |
|
|
|
|
import static org.junit.Assert.assertThrows; |
|
|
|
|
import static org.junit.Assert.assertTrue; |
|
|
|
|
|
|
|
|
|
import static org.mockito.ArgumentMatchers.any; |
|
|
|
|
import static org.mockito.ArgumentMatchers.eq; |
|
|
|
|
import static org.mockito.Mockito.doNothing; |
|
|
|
|
import static org.mockito.Mockito.doThrow; |
|
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
|
import static org.mockito.Mockito.when; |
|
|
|
|
|
|
|
|
|
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
|
|
|
|
public class MeteredSessionStoreTest { |
|
|
|
|
|
|
|
|
|
@Rule |
|
|
|
|
public EasyMockRule rule = new EasyMockRule(this); |
|
|
|
|
|
|
|
|
|
private static final String APPLICATION_ID = "test-app"; |
|
|
|
|
private static final String STORE_TYPE = "scope"; |
|
|
|
|
private static final String STORE_NAME = "mocked-store"; |
|
|
|
@ -101,9 +95,9 @@ public class MeteredSessionStoreTest {
@@ -101,9 +95,9 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
private final TaskId taskId = new TaskId(0, 0, "My-Topology"); |
|
|
|
|
private final Metrics metrics = new Metrics(); |
|
|
|
|
private MeteredSessionStore<String, String> store; |
|
|
|
|
@Mock(type = MockType.NICE) |
|
|
|
|
@Mock |
|
|
|
|
private SessionStore<Bytes, byte[]> innerStore; |
|
|
|
|
@Mock(type = MockType.NICE) |
|
|
|
|
@Mock |
|
|
|
|
private InternalProcessorContext context; |
|
|
|
|
|
|
|
|
|
private Map<String, String> tags; |
|
|
|
@ -119,12 +113,12 @@ public class MeteredSessionStoreTest {
@@ -119,12 +113,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
mockTime |
|
|
|
|
); |
|
|
|
|
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); |
|
|
|
|
expect(context.applicationId()).andStubReturn(APPLICATION_ID); |
|
|
|
|
expect(context.metrics()) |
|
|
|
|
.andStubReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); |
|
|
|
|
expect(context.taskId()).andStubReturn(taskId); |
|
|
|
|
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); |
|
|
|
|
expect(innerStore.name()).andStubReturn(STORE_NAME); |
|
|
|
|
when(context.applicationId()).thenReturn(APPLICATION_ID); |
|
|
|
|
when(context.metrics()) |
|
|
|
|
.thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); |
|
|
|
|
when(context.taskId()).thenReturn(taskId); |
|
|
|
|
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); |
|
|
|
|
when(innerStore.name()).thenReturn(STORE_NAME); |
|
|
|
|
tags = mkMap( |
|
|
|
|
mkEntry(THREAD_ID_TAG_KEY, threadId), |
|
|
|
|
mkEntry("task-id", taskId.toString()), |
|
|
|
@ -133,45 +127,34 @@ public class MeteredSessionStoreTest {
@@ -133,45 +127,34 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void init() { |
|
|
|
|
replay(innerStore, context); |
|
|
|
|
store.init((StateStoreContext) context, store); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
|
@Test |
|
|
|
|
public void shouldDelegateDeprecatedInit() { |
|
|
|
|
final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class); |
|
|
|
|
final MeteredSessionStore<String, String> outer = new MeteredSessionStore<>( |
|
|
|
|
inner, |
|
|
|
|
innerStore, |
|
|
|
|
STORE_TYPE, |
|
|
|
|
Serdes.String(), |
|
|
|
|
Serdes.String(), |
|
|
|
|
new MockTime() |
|
|
|
|
); |
|
|
|
|
expect(inner.name()).andStubReturn("store"); |
|
|
|
|
inner.init((ProcessorContext) context, outer); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(inner, context); |
|
|
|
|
doNothing().when(innerStore).init((ProcessorContext) context, outer); |
|
|
|
|
outer.init((ProcessorContext) context, outer); |
|
|
|
|
verify(inner); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldDelegateInit() { |
|
|
|
|
final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class); |
|
|
|
|
final MeteredSessionStore<String, String> outer = new MeteredSessionStore<>( |
|
|
|
|
inner, |
|
|
|
|
innerStore, |
|
|
|
|
STORE_TYPE, |
|
|
|
|
Serdes.String(), |
|
|
|
|
Serdes.String(), |
|
|
|
|
new MockTime() |
|
|
|
|
); |
|
|
|
|
expect(inner.name()).andStubReturn("store"); |
|
|
|
|
inner.init((StateStoreContext) context, outer); |
|
|
|
|
expectLastCall(); |
|
|
|
|
replay(inner, context); |
|
|
|
|
doNothing().when(innerStore).init((StateStoreContext) context, outer); |
|
|
|
|
outer.init((StateStoreContext) context, outer); |
|
|
|
|
verify(inner); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -183,24 +166,24 @@ public class MeteredSessionStoreTest {
@@ -183,24 +166,24 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { |
|
|
|
|
final String defaultChangelogTopicName = |
|
|
|
|
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, taskId.topologyName()); |
|
|
|
|
expect(context.changelogFor(STORE_NAME)).andReturn(null); |
|
|
|
|
when(context.changelogFor(STORE_NAME)).thenReturn(null); |
|
|
|
|
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { |
|
|
|
|
final Serde<String> keySerde = niceMock(Serde.class); |
|
|
|
|
final Serde<String> keySerde = mock(Serde.class); |
|
|
|
|
final Serializer<String> keySerializer = mock(Serializer.class); |
|
|
|
|
final Serde<String> valueSerde = niceMock(Serde.class); |
|
|
|
|
final Serde<String> valueSerde = mock(Serde.class); |
|
|
|
|
final Deserializer<String> valueDeserializer = mock(Deserializer.class); |
|
|
|
|
final Serializer<String> valueSerializer = mock(Serializer.class); |
|
|
|
|
expect(keySerde.serializer()).andStubReturn(keySerializer); |
|
|
|
|
expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); |
|
|
|
|
expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); |
|
|
|
|
expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE); |
|
|
|
|
expect(valueSerde.serializer()).andStubReturn(valueSerializer); |
|
|
|
|
expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES); |
|
|
|
|
expect(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).andStubReturn(VALUE_BYTES); |
|
|
|
|
replay(innerStore, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); |
|
|
|
|
when(keySerde.serializer()).thenReturn(keySerializer); |
|
|
|
|
when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes()); |
|
|
|
|
when(valueSerde.deserializer()).thenReturn(valueDeserializer); |
|
|
|
|
when(valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn(VALUE); |
|
|
|
|
when(valueSerde.serializer()).thenReturn(valueSerializer); |
|
|
|
|
when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES); |
|
|
|
|
when(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).thenReturn(VALUE_BYTES); |
|
|
|
|
store = new MeteredSessionStore<>( |
|
|
|
|
innerStore, |
|
|
|
|
STORE_TYPE, |
|
|
|
@ -212,8 +195,6 @@ public class MeteredSessionStoreTest {
@@ -212,8 +195,6 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
|
|
|
|
|
store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP); |
|
|
|
|
store.put(WINDOWED_KEY, VALUE); |
|
|
|
|
|
|
|
|
|
verify(keySerializer, valueDeserializer, valueSerializer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -237,8 +218,7 @@ public class MeteredSessionStoreTest {
@@ -237,8 +218,7 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { |
|
|
|
|
innerStore.put(eq(WINDOWED_KEY_BYTES), aryEq(VALUE_BYTES)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
doNothing().when(innerStore).put(WINDOWED_KEY_BYTES, VALUE_BYTES); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
|
store.put(WINDOWED_KEY, VALUE); |
|
|
|
@ -247,13 +227,12 @@ public class MeteredSessionStoreTest {
@@ -247,13 +227,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("put-rate"); |
|
|
|
|
assertTrue(((Double) metric.metricValue()) > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldFindSessionsFromStoreAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.findSessions(KEY_BYTES, 0, 0)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>( |
|
|
|
|
when(innerStore.findSessions(KEY_BYTES, 0, 0)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
@ -266,13 +245,12 @@ public class MeteredSessionStoreTest {
@@ -266,13 +245,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.backwardFindSessions(KEY_BYTES, 0, 0)) |
|
|
|
|
.andReturn( |
|
|
|
|
when(innerStore.backwardFindSessions(KEY_BYTES, 0, 0)) |
|
|
|
|
.thenReturn( |
|
|
|
|
new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() |
|
|
|
|
) |
|
|
|
@ -288,13 +266,12 @@ public class MeteredSessionStoreTest {
@@ -288,13 +266,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>( |
|
|
|
|
when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
@ -307,13 +284,12 @@ public class MeteredSessionStoreTest {
@@ -307,13 +284,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0)) |
|
|
|
|
.andReturn( |
|
|
|
|
when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0)) |
|
|
|
|
.thenReturn( |
|
|
|
|
new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() |
|
|
|
|
) |
|
|
|
@ -329,13 +305,11 @@ public class MeteredSessionStoreTest {
@@ -329,13 +305,11 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldRemoveFromStoreAndRecordRemoveMetric() { |
|
|
|
|
innerStore.remove(WINDOWED_KEY_BYTES); |
|
|
|
|
expectLastCall(); |
|
|
|
|
doNothing().when(innerStore).remove(WINDOWED_KEY_BYTES); |
|
|
|
|
|
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
@ -345,13 +319,12 @@ public class MeteredSessionStoreTest {
@@ -345,13 +319,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("remove-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldFetchForKeyAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.fetch(KEY_BYTES)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>( |
|
|
|
|
when(innerStore.fetch(KEY_BYTES)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
@ -364,13 +337,12 @@ public class MeteredSessionStoreTest {
@@ -364,13 +337,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldBackwardFetchForKeyAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.backwardFetch(KEY_BYTES)) |
|
|
|
|
.andReturn( |
|
|
|
|
when(innerStore.backwardFetch(KEY_BYTES)) |
|
|
|
|
.thenReturn( |
|
|
|
|
new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() |
|
|
|
|
) |
|
|
|
@ -386,13 +358,12 @@ public class MeteredSessionStoreTest {
@@ -386,13 +358,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldFetchRangeFromStoreAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.fetch(KEY_BYTES, KEY_BYTES)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>( |
|
|
|
|
when(innerStore.fetch(KEY_BYTES, KEY_BYTES)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
@ -405,13 +376,12 @@ public class MeteredSessionStoreTest {
@@ -405,13 +376,12 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() { |
|
|
|
|
expect(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES)) |
|
|
|
|
.andReturn( |
|
|
|
|
when(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES)) |
|
|
|
|
.thenReturn( |
|
|
|
|
new KeyValueIteratorStub<>( |
|
|
|
|
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() |
|
|
|
|
) |
|
|
|
@ -427,14 +397,13 @@ public class MeteredSessionStoreTest {
@@ -427,14 +397,13 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
// and the sensor is tested elsewhere
|
|
|
|
|
final KafkaMetric metric = metric("fetch-rate"); |
|
|
|
|
assertTrue((Double) metric.metricValue() > 0); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() { |
|
|
|
|
final long systemTime = Time.SYSTEM.milliseconds(); |
|
|
|
|
expect(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
when(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
|
final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime); |
|
|
|
@ -445,8 +414,8 @@ public class MeteredSessionStoreTest {
@@ -445,8 +414,8 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
@Test |
|
|
|
|
public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() { |
|
|
|
|
final long systemTime = Time.SYSTEM.milliseconds(); |
|
|
|
|
expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
when(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
|
final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime); |
|
|
|
@ -457,8 +426,8 @@ public class MeteredSessionStoreTest {
@@ -457,8 +426,8 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
@Test |
|
|
|
|
public void shouldNotFindExpiredSessionRangeFromStore() { |
|
|
|
|
final long systemTime = Time.SYSTEM.milliseconds(); |
|
|
|
|
expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
|
final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime); |
|
|
|
@ -469,8 +438,8 @@ public class MeteredSessionStoreTest {
@@ -469,8 +438,8 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
@Test |
|
|
|
|
public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() { |
|
|
|
|
final long systemTime = Time.SYSTEM.milliseconds(); |
|
|
|
|
expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) |
|
|
|
|
.thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); |
|
|
|
|
init(); |
|
|
|
|
|
|
|
|
|
final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime); |
|
|
|
@ -490,7 +459,7 @@ public class MeteredSessionStoreTest {
@@ -490,7 +459,7 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() { |
|
|
|
|
expect(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null); |
|
|
|
|
when(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).thenReturn(null); |
|
|
|
|
|
|
|
|
|
init(); |
|
|
|
|
assertNull(store.fetchSession("a", 0, Long.MAX_VALUE)); |
|
|
|
@ -598,8 +567,7 @@ public class MeteredSessionStoreTest {
@@ -598,8 +567,7 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
public void shouldSetFlushListenerOnWrappedCachingStore() { |
|
|
|
|
final CachedSessionStore cachedSessionStore = mock(CachedSessionStore.class); |
|
|
|
|
|
|
|
|
|
expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); |
|
|
|
|
replay(cachedSessionStore); |
|
|
|
|
when(cachedSessionStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true); |
|
|
|
|
|
|
|
|
|
store = new MeteredSessionStore<>( |
|
|
|
|
cachedSessionStore, |
|
|
|
@ -608,8 +576,6 @@ public class MeteredSessionStoreTest {
@@ -608,8 +576,6 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
Serdes.String(), |
|
|
|
|
new MockTime()); |
|
|
|
|
assertTrue(store.setFlushListener(null, false)); |
|
|
|
|
|
|
|
|
|
verify(cachedSessionStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -619,27 +585,23 @@ public class MeteredSessionStoreTest {
@@ -619,27 +585,23 @@ public class MeteredSessionStoreTest {
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldRemoveMetricsOnClose() { |
|
|
|
|
innerStore.close(); |
|
|
|
|
expectLastCall(); |
|
|
|
|
doNothing().when(innerStore).close(); |
|
|
|
|
init(); // replays "inner"
|
|
|
|
|
|
|
|
|
|
// There's always a "count" metric registered
|
|
|
|
|
assertThat(storeMetrics(), not(empty())); |
|
|
|
|
store.close(); |
|
|
|
|
assertThat(storeMetrics(), empty()); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { |
|
|
|
|
innerStore.close(); |
|
|
|
|
expectLastCall().andThrow(new RuntimeException("Oops!")); |
|
|
|
|
doThrow(new RuntimeException("Oops!")).when(innerStore).close(); |
|
|
|
|
init(); // replays "inner"
|
|
|
|
|
|
|
|
|
|
assertThat(storeMetrics(), not(empty())); |
|
|
|
|
assertThrows(RuntimeException.class, store::close); |
|
|
|
|
assertThat(storeMetrics(), empty()); |
|
|
|
|
verify(innerStore); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private KafkaMetric metric(final String name) { |
|
|
|
|