Browse Source

KAFKA-5956; use serdes from materialized in table and globalTable

The new overloads `StreamBuilder.table(String, Materialized)` and `StreamsBuilder.globalTable(String, Materialized)` need to set the serdes from `Materialized` on the internal `Consumed` instance that is created, otherwise the defaults will be used and may result in serialization errors

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3936 from dguy/table-materialized
pull/3936/merge
Damian Guy 7 years ago
parent
commit
125d8d6f70
  1. 10
      streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
  2. 53
      streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java

10
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java

@ -301,8 +301,10 @@ public class StreamsBuilder { @@ -301,8 +301,10 @@ public class StreamsBuilder {
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<K, V>(),
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
new MaterializedInternal<>(materialized));
}
@ -429,9 +431,11 @@ public class StreamsBuilder { @@ -429,9 +431,11 @@ public class StreamsBuilder {
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<K, V>(),
new MaterializedInternal<>(materialized));
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
materializedInternal);
}

53
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java

@ -16,21 +16,31 @@ @@ -16,21 +16,31 @@
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class StreamsBuilderTest {
@ -108,6 +118,49 @@ public class StreamsBuilderTest { @@ -108,6 +118,49 @@ public class StreamsBuilderTest {
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
@Test
public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
final Map<Long, String> results = new HashMap<>();
final String topic = "topic";
final ForeachAction<Long, String> action = new ForeachAction<Long, String>() {
@Override
public void apply(final Long key, final String value) {
results.put(key, value);
}
};
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()))
.toStream().foreach(action);
driver.setUp(builder, TestUtils.tempDirectory());
driver.setTime(0L);
driver.process(topic, 1L, "value1");
driver.process(topic, 2L, "value2");
driver.flushState();
final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store");
assertThat(store.get(1L), equalTo("value1"));
assertThat(store.get(2L), equalTo("value2"));
assertThat(results.get(1L), equalTo("value1"));
assertThat(results.get(2L), equalTo("value2"));
}
@Test
public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
final String topic = "topic";
builder.globalTable(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
driver.setUp(builder, TestUtils.tempDirectory());
driver.setTime(0L);
driver.process(topic, 1L, "value1");
driver.process(topic, 2L, "value2");
driver.flushState();
final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store");
assertThat(store.get(1L), equalTo("value1"));
assertThat(store.get(2L), equalTo("value2"));
}
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
builder.stream(Collections.<String>emptyList());

Loading…
Cancel
Save