Browse Source

KAFKA-9517: Fix default serdes with FK join (#8061)

During the KIP-213 implementation and verification, we neglected to test the
code path for falling back to default serdes if none are given in the topology.

Reviewer: Bill Bejeck <bbejeck@gmail.com>
pull/8093/head
John Roesler 5 years ago committed by GitHub
parent
commit
520a76155c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
  2. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
  3. 4
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  4. 23
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
  5. 23
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
  6. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
  7. 5
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
  8. 11
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
  9. 33
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
  10. 2
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
  11. 35
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
  12. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
  13. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
  14. 1
      streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
  15. 178
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java

@ -20,8 +20,9 @@ import org.apache.kafka.common.header.Headers; @@ -20,8 +20,9 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Objects;
public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, T> {
private static final int NEWFLAG_SIZE = 1;
@ -35,8 +36,11 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> { @@ -35,8 +36,11 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
return inner;
}
public void setInner(final Deserializer<T> inner) {
this.inner = inner;
@Override
public void setIfUnset(final Deserializer<T> defaultDeserializer) {
if (inner == null) {
inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
}
}
@Override

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java

@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.Serializer; @@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
import java.util.Objects;
public class ChangedSerializer<T> implements Serializer<Change<T>> {
public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, T> {
private static final int NEWFLAG_SIZE = 1;
@ -36,8 +37,11 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> { @@ -36,8 +37,11 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
return inner;
}
public void setInner(final Serializer<T> inner) {
this.inner = inner;
@Override
public void setIfUnset(final Serializer<T> defaultSerializer) {
if (inner == null) {
inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
}
}
/**

4
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -970,7 +970,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -970,7 +970,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
foreignKeyExtractor,
foreignKeySerde,
subscriptionTopicName,
valSerde.serializer(),
valSerde == null ? null : valSerde.serializer(),
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
@ -1073,7 +1073,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -1073,7 +1073,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
valueSerde().serializer(),
valSerde == null ? null : valSerde.serializer(),
joiner,
leftJoin
);

23
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java

@ -0,0 +1,23 @@ @@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
public interface WrappingNullableDeserializer<Outer, Inner> extends Deserializer<Outer> {
void setIfUnset(final Deserializer<Inner> defaultDeserializer);
}

23
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java

@ -0,0 +1,23 @@ @@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serializer;
public interface WrappingNullableSerializer<Outer, Inner> extends Serializer<Outer> {
void setIfUnset(final Serializer<Inner> defaultSerializer);
}

8
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java

@ -36,10 +36,10 @@ public class CombinedKeySchema<KO, K> { @@ -36,10 +36,10 @@ public class CombinedKeySchema<KO, K> {
public CombinedKeySchema(final String serdeTopic, final Serde<KO> foreignKeySerde, final Serde<K> primaryKeySerde) {
this.serdeTopic = serdeTopic;
primaryKeySerializer = primaryKeySerde.serializer();
primaryKeyDeserializer = primaryKeySerde.deserializer();
foreignKeyDeserializer = foreignKeySerde.deserializer();
foreignKeySerializer = foreignKeySerde.serializer();
primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
}
@SuppressWarnings("unchecked")

5
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java

@ -44,9 +44,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P @@ -44,9 +44,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private final Function<V, KO> foreignKeyExtractor;
private final String repartitionTopicName;
private final Serializer<V> valueSerializer;
private final boolean leftJoin;
private Serializer<KO> foreignKeySerializer;
private Serializer<V> valueSerializer;
public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
final Serde<KO> foreignKeySerde,
@ -77,6 +77,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P @@ -77,6 +77,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
if (foreignKeySerializer == null) {
foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
}
if (valueSerializer == null) {
valueSerializer = (Serializer<V>) context.valueSerde().serializer();
}
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
context.taskId().toString(),

11
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java

@ -41,7 +41,7 @@ import org.apache.kafka.streams.state.internals.Murmur3; @@ -41,7 +41,7 @@ import org.apache.kafka.streams.state.internals.Murmur3;
*/
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> valueSerializer;
private final Serializer<V> constructionTimeValueSerializer;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;
@ -50,7 +50,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements @@ -50,7 +50,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
this.valueSerializer = valueSerializer;
constructionTimeValueSerializer = valueSerializer;
this.joiner = joiner;
this.leftJoin = leftJoin;
}
@ -58,14 +58,19 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements @@ -58,14 +58,19 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
@Override
public Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
private KTableValueGetter<K, V> valueGetter;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
valueGetter = valueGetterSupplier.get();
valueGetter.init(context);
if (runtimeValueSerializer == null) {
runtimeValueSerializer = (Serializer<V>) context.valueSerde().serializer();
}
}
@Override
@ -86,7 +91,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements @@ -86,7 +91,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
final String dummySerializationTopic = context().topic() + "-join-resolver";
final long[] currentHash = currentValueWithTimestamp == null ?
null :
Murmur3.hash128(valueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
final long[] messageHash = value.getOriginalValueHash();

33
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java

@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import java.nio.ByteBuffer;
import java.util.Objects;
public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionResponseWrapper<V>> {
private final SubscriptionResponseWrapperSerializer<V> serializer;
private final SubscriptionResponseWrapperDeserializer<V> deserializer;
public SubscriptionResponseWrapperSerde(final Serde<V> foreignValueSerde) {
serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde.serializer());
deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde.deserializer());
serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde == null ? null : foreignValueSerde.serializer());
deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde == null ? null : foreignValueSerde.deserializer());
}
@Override
@ -42,13 +45,22 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe @@ -42,13 +45,22 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
return deserializer;
}
private static final class SubscriptionResponseWrapperSerializer<V> implements Serializer<SubscriptionResponseWrapper<V>> {
private final Serializer<V> serializer;
private static final class SubscriptionResponseWrapperSerializer<V>
implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, V> {
private Serializer<V> serializer;
private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
this.serializer = serializer;
}
@Override
public void setIfUnset(final Serializer<V> defaultSerializer) {
if (serializer == null) {
serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
}
}
@Override
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
@ -81,13 +93,22 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe @@ -81,13 +93,22 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
}
private static final class SubscriptionResponseWrapperDeserializer<V> implements Deserializer<SubscriptionResponseWrapper<V>> {
private final Deserializer<V> deserializer;
private static final class SubscriptionResponseWrapperDeserializer<V>
implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, V> {
private Deserializer<V> deserializer;
private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializer) {
this.deserializer = deserializer;
}
@Override
public void setIfUnset(final Deserializer<V> defaultDeserializer) {
if (deserializer == null) {
deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
}
}
@Override
public SubscriptionResponseWrapper<V> deserialize(final String topic, final byte[] data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}

2
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java

@ -68,6 +68,8 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO> @@ -68,6 +68,8 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
internalProcessorContext.metrics()
);
store = internalProcessorContext.getStateStore(storeBuilder);
keySchema.init(context);
}
@Override

35
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java

@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import java.nio.ByteBuffer;
import java.util.Objects;
public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
private final SubscriptionWrapperSerializer<K> serializer;
private final SubscriptionWrapperDeserializer<K> deserializer;
public SubscriptionWrapperSerde(final Serde<K> primaryKeySerde) {
serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde.serializer());
deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde.deserializer());
serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde == null ? null : primaryKeySerde.serializer());
deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde == null ? null : primaryKeySerde.deserializer());
}
@Override
@ -42,12 +45,22 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K> @@ -42,12 +45,22 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
return deserializer;
}
private static class SubscriptionWrapperSerializer<K> implements Serializer<SubscriptionWrapper<K>> {
private final Serializer<K> primaryKeySerializer;
public static class SubscriptionWrapperSerializer<K>
implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
private Serializer<K> primaryKeySerializer;
SubscriptionWrapperSerializer(final Serializer<K> primaryKeySerializer) {
this.primaryKeySerializer = primaryKeySerializer;
}
@Override
public void setIfUnset(final Serializer<K> defaultSerializer) {
if (primaryKeySerializer == null) {
primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
}
}
@Override
public byte[] serialize(final String topic, final SubscriptionWrapper<K> data) {
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
@ -81,12 +94,22 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K> @@ -81,12 +94,22 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
}
private static class SubscriptionWrapperDeserializer<K> implements Deserializer<SubscriptionWrapper<K>> {
private final Deserializer<K> primaryKeyDeserializer;
public static class SubscriptionWrapperDeserializer<K>
implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
private Deserializer<K> primaryKeyDeserializer;
SubscriptionWrapperDeserializer(final Deserializer<K> primaryKeyDeserializer) {
this.primaryKeyDeserializer = primaryKeyDeserializer;
}
@Override
public void setIfUnset(final Deserializer<K> defaultDeserializer) {
if (primaryKeyDeserializer == null) {
primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
}
}
@Override
public SubscriptionWrapper<K> deserialize(final String topic, final byte[] data) {
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java

@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor.internals; @@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
@ -66,10 +66,10 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { @@ -66,10 +66,10 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
valSerializer = (Serializer<V>) context.valueSerde().serializer();
}
// if value serializers are for {@code Change} values, set the inner serializer when necessary
if (valSerializer instanceof ChangedSerializer &&
((ChangedSerializer) valSerializer).inner() == null) {
((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer());
// if value serializers are internal wrapping serializers that may need to be given the default serializer
// then pass it the default one from the context
if (valSerializer instanceof WrappingNullableSerializer) {
((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer());
}
}

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java

@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals; @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
@ -88,10 +88,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { @@ -88,10 +88,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
}
// if value deserializers are for {@code Change} values, set the inner deserializer when necessary
if (this.valDeserializer instanceof ChangedDeserializer &&
((ChangedDeserializer) this.valDeserializer).inner() == null) {
((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
// if value deserializers are internal wrapping deserializers that may need to be given the default
// then pass it the default one from the context
if (valDeserializer instanceof WrappingNullableDeserializer) {
((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer());
}
}

1
streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java

@ -39,6 +39,7 @@ import org.junit.runners.Suite; @@ -39,6 +39,7 @@ import org.junit.runners.Suite;
KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
KTableKTableForeignKeyJoinIntegrationTest.class,
KTableKTableForeignKeyJoinMaterializationIntegrationTest.class,
KTableKTableForeignKeyJoinDefaultSerdeTest.class,
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class,

178
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java

@ -0,0 +1,178 @@ @@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class KTableKTableForeignKeyJoinDefaultSerdeTest {
@Test
public void shouldWorkWithDefaultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> aTable = builder.table("A");
final KTable<String, String> bTable = builder.table("B");
final KTable<String, String> fkJoinResult = aTable.join(
bTable,
value -> value.split("-")[0],
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
final KTable<String, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
finalJoinResult.toStream().to("output");
validateTopologyCanProcessData(builder);
}
@Test
public void shouldWorkWithDefaultAndConsumedSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> aTable = builder.table("A", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> bTable = builder.table("B");
final KTable<String, String> fkJoinResult = aTable.join(
bTable,
value -> value.split("-")[0],
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
final KTable<String, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
finalJoinResult.toStream().to("output");
validateTopologyCanProcessData(builder);
}
@Test
public void shouldWorkWithDefaultAndJoinResultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> aTable = builder.table("A");
final KTable<String, String> bTable = builder.table("B");
final KTable<String, String> fkJoinResult = aTable.join(
bTable,
value -> value.split("-")[0],
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized
.<String, String, KeyValueStore<Bytes, byte[]>>as("asdf")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);
final KTable<String, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
finalJoinResult.toStream().to("output");
validateTopologyCanProcessData(builder);
}
@Test
public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> aTable = builder.table("A");
final KTable<String, String> bTable = builder.table("B");
final KTable<String, String> fkJoinResult = aTable.join(
bTable,
value -> value.split("-")[0],
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
final KTable<String, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")",
Materialized.with(Serdes.String(), Serdes.String())
);
finalJoinResult.toStream().to("output");
validateTopologyCanProcessData(builder);
}
@Test
public void shouldWorkWithDefaultAndProducedSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> aTable = builder.table("A");
final KTable<String, String> bTable = builder.table("B");
final KTable<String, String> fkJoinResult = aTable.join(
bTable,
value -> value.split("-")[0],
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
final KTable<String, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
validateTopologyCanProcessData(builder);
}
private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) {
final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
aTopic.pipeInput("a1", "b1-alpha");
bTopic.pipeInput("b1", "beta");
final Map<String, String> x = output.readKeyValuesToMap();
assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
}
}
}
Loading…
Cancel
Save