Browse Source

KAFKA-5932; Avoid call to fetchPrevious in FlushListeners

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3978 from bbejeck/KAFKA-5932_no_fetch_previous_when_no_old_values_returned
pull/3920/merge
Bill Bejeck 7 years ago committed by Damian Guy
parent
commit
36556b8041
  1. 2
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
  2. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
  3. 9
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
  4. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
  5. 9
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
  6. 17
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
  7. 51
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
  8. 14
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java

2
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java

@ -42,7 +42,7 @@ class TupleForwarder<K, V> { @@ -42,7 +42,7 @@ class TupleForwarder<K, V> {
this.context = context;
this.sendOldValues = sendOldValues;
if (this.cachedStateStore != null) {
cachedStateStore.setFlushListener(flushListener);
cachedStateStore.setFlushListener(flushListener, sendOldValues);
}
}

4
streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java

@ -23,6 +23,8 @@ public interface CachedStateStore<K, V> { @@ -23,6 +23,8 @@ public interface CachedStateStore<K, V> {
* Set the {@link CacheFlushListener} to be notified when entries are flushed from the
* cache to the underlying {@link org.apache.kafka.streams.processor.StateStore}
* @param listener
* @param sendOldValues
*/
void setFlushListener(final CacheFlushListener<K, V> listener);
void setFlushListener(final CacheFlushListener<K, V> listener,
final boolean sendOldValues);
}

9
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java

@ -38,6 +38,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im @@ -38,6 +38,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private CacheFlushListener<K, V> flushListener;
private boolean sendOldValues;
private String cacheName;
private ThreadCache cache;
private InternalProcessorContext context;
@ -87,9 +88,10 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im @@ -87,9 +88,10 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
context.setRecordContext(entry.recordContext());
if (flushListener != null) {
final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
flushListener.apply(serdes.keyFrom(entry.key().get()),
serdes.valueFrom(entry.newValue()),
serdes.valueFrom(underlying.get(entry.key())));
oldValue);
}
underlying.put(entry.key(), entry.newValue());
@ -98,8 +100,11 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im @@ -98,8 +100,11 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
}
public void setFlushListener(final CacheFlushListener<K, V> flushListener) {
public void setFlushListener(final CacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.flushListener = flushListener;
this.sendOldValues = sendOldValues;
}
@Override

7
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java

@ -47,6 +47,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -47,6 +47,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
private StateSerdes<K, AGG> serdes;
private InternalProcessorContext context;
private CacheFlushListener<Windowed<K>, AGG> flushListener;
private boolean sendOldValues;
private String topic;
CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
@ -170,7 +171,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -170,7 +171,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = fetchPrevious(rawKey, key.window());
final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null;
if (!(newValue == null && oldValue == null)) {
flushListener.apply(key, newValue, oldValue);
}
@ -202,8 +203,10 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -202,8 +203,10 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
bytesStore.close();
}
public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener) {
public void setFlushListener(final CacheFlushListener<Windowed<K>, AGG> flushListener,
final boolean sendOldValues) {
this.flushListener = flushListener;
this.sendOldValues = sendOldValues;
}
}

9
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

@ -49,6 +49,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @@ -49,6 +49,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
private StateSerdes<K, V> serdes;
private StateSerdes<Bytes, byte[]> bytesSerdes;
private CacheFlushListener<Windowed<K>, V> flushListener;
private boolean sendOldValues;
private final SegmentedCacheFunction cacheFunction;
CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
@ -110,16 +111,20 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @@ -110,16 +111,20 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
final RecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
flushListener.apply(windowedKey,
serdes.valueFrom(entry.newValue()), fetchPrevious(key, windowedKey.window().start()));
serdes.valueFrom(entry.newValue()), oldValue);
} finally {
context.setRecordContext(current);
}
}
}
public void setFlushListener(CacheFlushListener<Windowed<K>, V> flushListener) {
public void setFlushListener(final CacheFlushListener<Windowed<K>, V> flushListener,
final boolean sendOldValues) {
this.flushListener = flushListener;
this.sendOldValues = sendOldValues;
}
@Override

17
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java

@ -50,8 +50,8 @@ import static org.junit.Assert.assertFalse; @@ -50,8 +50,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@ -69,7 +69,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -69,7 +69,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
cacheFlushListener = new CacheFlushListenerStub<>();
store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
store.setFlushListener(cacheFlushListener);
store.setFlushListener(cacheFlushListener, false);
cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
topic = "topic";
@ -103,7 +103,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -103,7 +103,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
inner.setFlushListener(cacheFlushListener);
inner.setFlushListener(cacheFlushListener, false);
store.init(context, store);
return store;
}
@ -152,6 +152,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -152,6 +152,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@Test
public void shouldForwardOldValuesWhenEnabled() {
store.setFlushListener(cacheFlushListener, true);
store.put(bytesKey("1"), bytesValue("a"));
store.flush();
store.put(bytesKey("1"), bytesValue("b"));
@ -160,6 +161,16 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -160,6 +161,16 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue);
}
@Test
public void shouldNotForwardOldValuesWhenDisabled() {
store.put(bytesKey("1"), bytesValue("a"));
store.flush();
store.put(bytesKey("1"), bytesValue("b"));
store.flush();
assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
assertNull(cacheFlushListener.forwarded.get("1").oldValue);
}
@Test
public void shouldIterateAllStoredItems() throws IOException {
int items = addItemsToCache();

51
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java

@ -237,7 +237,7 @@ public class CachingSessionStoreTest { @@ -237,7 +237,7 @@ public class CachingSessionStoreTest {
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
});
}, true);
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
@ -253,6 +253,55 @@ public class CachingSessionStoreTest { @@ -253,6 +253,55 @@ public class CachingSessionStoreTest {
KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
}
@Test
public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, false);
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
cachingStore.put(a, "2".getBytes());
cachingStore.flush();
cachingStore.remove(a);
cachingStore.flush();
assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null)),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
}
@Test
public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, false);
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
cachingStore.put(a, "2".getBytes());
cachingStore.flush();
assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null))));
}
@Test
public void shouldClearNamespaceCacheOnClose() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));

14
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java

@ -77,7 +77,7 @@ public class CachingWindowStoreTest { @@ -77,7 +77,7 @@ public class CachingWindowStoreTest {
Serdes.String(),
WINDOW_SIZE,
Segments.segmentInterval(retention, numSegments));
cachingStore.setFlushListener(cacheListener);
cachingStore.setFlushListener(cacheListener, false);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
topic = "topic";
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
@ -154,6 +154,7 @@ public class CachingWindowStoreTest { @@ -154,6 +154,7 @@ public class CachingWindowStoreTest {
@Test
public void shouldForwardOldValuesWhenEnabled() {
cachingStore.setFlushListener(cacheListener, true);
final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.flush();
@ -163,6 +164,17 @@ public class CachingWindowStoreTest { @@ -163,6 +164,17 @@ public class CachingWindowStoreTest {
assertEquals("a", cacheListener.forwarded.get(windowedKey).oldValue);
}
@Test
public void shouldForwardOldValuesWhenDisabled() {
final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.flush();
cachingStore.put(bytesKey("1"), bytesValue("b"));
cachingStore.flush();
assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
}
@Test
public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException {
int numRecords = addItemsToCache();

Loading…
Cancel
Save