Browse Source

KAFKA-6560: [FOLLOW-UP] don't deserialize null byte array in window store fetch (#4665)

If the result of a fetch from a Window Store results in a null byte array we should return null rather than passing it to the serde to deserialize.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/4669/head
Damian Guy 7 years ago committed by Guozhang Wang
parent
commit
989088f697
  1. 8
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
  2. 3
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
  3. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
  4. 7
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
  5. 3
      streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
  6. 24
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
  7. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
  8. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
  9. 13
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
  10. 55
      streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java

8
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java

@ -106,15 +106,15 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto @@ -106,15 +106,15 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
@Override
public V fetch(final K key, final long timestamp) {
final long startNs = time.nanoseconds();
V ret;
try {
final byte[] result = inner.fetch(keyBytes(key), timestamp);
ret = serdes.valueFrom(result);
if (result == null) {
return null;
}
return serdes.valueFrom(result);
} finally {
metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
}
return ret;
}
@Override

3
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java

@ -110,6 +110,9 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto @@ -110,6 +110,9 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
@Override
public V fetch(final K key, final long timestamp) {
final byte[] bytesValue = bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes));
if (bytesValue == null) {
return null;
}
return serdes.valueFrom(bytesValue);
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java

@ -81,6 +81,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -81,6 +81,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@After
public void after() {
super.after();
context.close();
}

7
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java

@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.internals.Change; @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
@ -76,7 +75,7 @@ public class CachingSessionStoreTest { @@ -76,7 +75,7 @@ public class CachingSessionStoreTest {
Segments.segmentInterval(retention, numSegments)
);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
cachingStore.init(context, cachingStore);
}
@ -87,10 +86,6 @@ public class CachingSessionStoreTest { @@ -87,10 +86,6 @@ public class CachingSessionStoreTest {
cachingStore.close();
}
private Bytes bytesKey(final String key) {
return Bytes.wrap(key.getBytes());
}
@Test
public void shouldPutFetchFromCache() {
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());

3
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java

@ -69,9 +69,8 @@ public class ChangeLoggingSessionBytesStoreTest { @@ -69,9 +69,8 @@ public class ChangeLoggingSessionBytesStoreTest {
private final Windowed<Bytes> key1 = new Windowed<>(bytesKey, new SessionWindow(0, 0));
@Before
public void setUp() throws Exception {
public void setUp() {
store = new ChangeLoggingSessionBytesStore(inner);
}
private void init() {

24
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java

@ -40,13 +40,14 @@ import java.util.HashSet; @@ -40,13 +40,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class MeteredWindowStoreTest {
private MockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), Serdes.String());
private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
private final Set<String> latencyRecorded = new HashSet<>();
private final Set<String> throughputRecorded = new HashSet<>();
@ -118,7 +119,7 @@ public class MeteredWindowStoreTest { @@ -118,7 +119,7 @@ public class MeteredWindowStoreTest {
}
@Test
public void shouldRecordRestoreLatencyOnInit() throws Exception {
public void shouldRecordRestoreLatencyOnInit() {
innerStoreMock.init(context, store);
EasyMock.expectLastCall();
EasyMock.replay(innerStoreMock);
@ -127,7 +128,7 @@ public class MeteredWindowStoreTest { @@ -127,7 +128,7 @@ public class MeteredWindowStoreTest {
}
@Test
public void shouldRecordPutLatency() throws Exception {
public void shouldRecordPutLatency() {
final byte[] bytes = "a".getBytes();
innerStoreMock.put(EasyMock.eq(Bytes.wrap(bytes)), EasyMock.<byte[]>anyObject(), EasyMock.eq(context.timestamp()));
EasyMock.expectLastCall();
@ -140,7 +141,7 @@ public class MeteredWindowStoreTest { @@ -140,7 +141,7 @@ public class MeteredWindowStoreTest {
}
@Test
public void shouldRecordFetchLatency() throws Exception {
public void shouldRecordFetchLatency() {
EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
EasyMock.replay(innerStoreMock);
@ -151,7 +152,7 @@ public class MeteredWindowStoreTest { @@ -151,7 +152,7 @@ public class MeteredWindowStoreTest {
}
@Test
public void shouldRecordFetchRangeLatency() throws Exception {
public void shouldRecordFetchRangeLatency() {
EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
EasyMock.replay(innerStoreMock);
@ -163,7 +164,7 @@ public class MeteredWindowStoreTest { @@ -163,7 +164,7 @@ public class MeteredWindowStoreTest {
@Test
public void shouldRecordFlushLatency() throws Exception {
public void shouldRecordFlushLatency() {
innerStoreMock.flush();
EasyMock.expectLastCall();
EasyMock.replay(innerStoreMock);
@ -176,7 +177,7 @@ public class MeteredWindowStoreTest { @@ -176,7 +177,7 @@ public class MeteredWindowStoreTest {
@Test
public void shouldCloseUnderlyingStore() throws Exception {
public void shouldCloseUnderlyingStore() {
innerStoreMock.close();
EasyMock.expectLastCall();
EasyMock.replay(innerStoreMock);
@ -187,4 +188,13 @@ public class MeteredWindowStoreTest { @@ -187,4 +188,13 @@ public class MeteredWindowStoreTest {
}
@Test
public void shouldNotExceptionIfFetchReturnsNull() {
EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
EasyMock.replay(innerStoreMock);
store.init(context, store);
assertNull(store.fetch("a", 0));
}
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java

@ -51,7 +51,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -51,7 +51,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
}
public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called = false;
@Override

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

@ -79,6 +79,7 @@ public class RocksDBStoreTest { @@ -79,6 +79,7 @@ public class RocksDBStoreTest {
@After
public void tearDown() {
rocksDBStore.close();
context.close();
}
@Test

13
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java

@ -795,6 +795,19 @@ public class RocksDBWindowStoreTest { @@ -795,6 +795,19 @@ public class RocksDBWindowStoreTest {
windowStore.fetch(1, null, 1L, 2L);
}
@Test
public void shouldNoNullPointerWhenSerdeDoesntHandleNull() {
windowStore = new RocksDBWindowStore<>(
new RocksDBSegmentedBytesStore(windowName, retentionPeriod, numSegments, new WindowKeySchema()),
Serdes.Integer(),
new SerdeThatDoesntHandleNull(),
false,
windowSize);
windowStore.init(context, windowStore);
assertNull(windowStore.fetch(1, 0));
}
@Test
public void shouldFetchAndIterateOverExactBinaryKeys() {
final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(

55
streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java

@ -0,0 +1,55 @@ @@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
class SerdeThatDoesntHandleNull implements Serde<String> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
}
@Override
public void close() {
}
@Override
public Serializer<String> serializer() {
return new StringSerializer();
}
@Override
public Deserializer<String> deserializer() {
return new StringDeserializer() {
@Override
public String deserialize(final String topic, final byte[] data) {
if (data == null) {
throw new NullPointerException();
}
return super.deserialize(topic, data);
}
};
}
}
Loading…
Cancel
Save