diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index 36f77b81b23..90d5882887f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -20,8 +20,9 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; +import java.util.Objects; -public class ChangedDeserializer implements Deserializer> { +public class ChangedDeserializer implements Deserializer>, WrappingNullableDeserializer, T> { private static final int NEWFLAG_SIZE = 1; @@ -35,8 +36,11 @@ public class ChangedDeserializer implements Deserializer> { return inner; } - public void setInner(final Deserializer inner) { - this.inner = inner; + @Override + public void setIfUnset(final Deserializer defaultDeserializer) { + if (inner == null) { + inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null"); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index bfd0afa1b10..551d9483922 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; import java.nio.ByteBuffer; +import java.util.Objects; -public class ChangedSerializer implements Serializer> { +public class ChangedSerializer implements Serializer>, WrappingNullableSerializer, T> { private static final int NEWFLAG_SIZE = 1; @@ -36,8 +37,11 @@ public class ChangedSerializer implements Serializer> { return inner; } - public void setInner(final Serializer inner) { - this.inner = inner; + @Override + public void setIfUnset(final Serializer defaultSerializer) { + if (inner == null) { + inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null"); + } } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 38e5c49198f..f5c5aece072 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -970,7 +970,7 @@ public class KTableImpl extends AbstractStream implements KTable< foreignKeyExtractor, foreignKeySerde, subscriptionTopicName, - valSerde.serializer(), + valSerde == null ? null : valSerde.serializer(), leftJoin ), renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION) @@ -1073,7 +1073,7 @@ public class KTableImpl extends AbstractStream implements KTable< final KTableValueGetterSupplier primaryKeyValueGetter = valueGetterSupplier(); final SubscriptionResolverJoinProcessorSupplier resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>( primaryKeyValueGetter, - valueSerde().serializer(), + valSerde == null ? null : valSerde.serializer(), joiner, leftJoin ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java new file mode 100644 index 00000000000..a57e9a15315 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; + +public interface WrappingNullableDeserializer extends Deserializer { + void setIfUnset(final Deserializer defaultDeserializer); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java new file mode 100644 index 00000000000..2d28e52db2b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serializer; + +public interface WrappingNullableSerializer extends Serializer { + void setIfUnset(final Serializer defaultSerializer); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java index 8abe583b0c0..7cda404e01d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java @@ -36,10 +36,10 @@ public class CombinedKeySchema { public CombinedKeySchema(final String serdeTopic, final Serde foreignKeySerde, final Serde primaryKeySerde) { this.serdeTopic = serdeTopic; - primaryKeySerializer = primaryKeySerde.serializer(); - primaryKeyDeserializer = primaryKeySerde.deserializer(); - foreignKeyDeserializer = foreignKeySerde.deserializer(); - foreignKeySerializer = foreignKeySerde.serializer(); + primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer(); + primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer(); + foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer(); + foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer(); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java index 00815f92400..06bd7d25ea5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java @@ -44,9 +44,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier implements P private final Function foreignKeyExtractor; private final String repartitionTopicName; - private final Serializer valueSerializer; private final boolean leftJoin; private Serializer foreignKeySerializer; + private Serializer valueSerializer; public ForeignJoinSubscriptionSendProcessorSupplier(final Function foreignKeyExtractor, final Serde foreignKeySerde, @@ -77,6 +77,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier implements P if (foreignKeySerializer == null) { foreignKeySerializer = (Serializer) context.keySerde().serializer(); } + if (valueSerializer == null) { + valueSerializer = (Serializer) context.valueSerde().serializer(); + } droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor( Thread.currentThread().getName(), context.taskId().toString(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java index a188f15062c..8fa77aa1671 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java @@ -41,7 +41,7 @@ import org.apache.kafka.streams.state.internals.Murmur3; */ public class SubscriptionResolverJoinProcessorSupplier implements ProcessorSupplier> { private final KTableValueGetterSupplier valueGetterSupplier; - private final Serializer valueSerializer; + private final Serializer constructionTimeValueSerializer; private final ValueJoiner joiner; private final boolean leftJoin; @@ -50,7 +50,7 @@ public class SubscriptionResolverJoinProcessorSupplier implements final ValueJoiner joiner, final boolean leftJoin) { this.valueGetterSupplier = valueGetterSupplier; - this.valueSerializer = valueSerializer; + constructionTimeValueSerializer = valueSerializer; this.joiner = joiner; this.leftJoin = leftJoin; } @@ -58,14 +58,19 @@ public class SubscriptionResolverJoinProcessorSupplier implements @Override public Processor> get() { return new AbstractProcessor>() { + private Serializer runtimeValueSerializer = constructionTimeValueSerializer; private KTableValueGetter valueGetter; + @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); valueGetter = valueGetterSupplier.get(); valueGetter.init(context); + if (runtimeValueSerializer == null) { + runtimeValueSerializer = (Serializer) context.valueSerde().serializer(); + } } @Override @@ -86,7 +91,7 @@ public class SubscriptionResolverJoinProcessorSupplier implements final String dummySerializationTopic = context().topic() + "-join-resolver"; final long[] currentHash = currentValueWithTimestamp == null ? null : - Murmur3.hash128(valueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value())); + Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value())); final long[] messageHash = value.getOriginalValueHash(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java index 4277f9a8fd7..31317c500df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java @@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import java.nio.ByteBuffer; +import java.util.Objects; public class SubscriptionResponseWrapperSerde implements Serde> { private final SubscriptionResponseWrapperSerializer serializer; private final SubscriptionResponseWrapperDeserializer deserializer; public SubscriptionResponseWrapperSerde(final Serde foreignValueSerde) { - serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde.serializer()); - deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde.deserializer()); + serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde == null ? null : foreignValueSerde.serializer()); + deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde == null ? null : foreignValueSerde.deserializer()); } @Override @@ -42,13 +45,22 @@ public class SubscriptionResponseWrapperSerde implements Serde implements Serializer> { - private final Serializer serializer; + private static final class SubscriptionResponseWrapperSerializer + implements Serializer>, WrappingNullableSerializer, V> { + + private Serializer serializer; private SubscriptionResponseWrapperSerializer(final Serializer serializer) { this.serializer = serializer; } + @Override + public void setIfUnset(final Serializer defaultSerializer) { + if (serializer == null) { + serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null"); + } + } + @Override public byte[] serialize(final String topic, final SubscriptionResponseWrapper data) { //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data} @@ -81,13 +93,22 @@ public class SubscriptionResponseWrapperSerde implements Serde implements Deserializer> { - private final Deserializer deserializer; + private static final class SubscriptionResponseWrapperDeserializer + implements Deserializer>, WrappingNullableDeserializer, V> { + + private Deserializer deserializer; private SubscriptionResponseWrapperDeserializer(final Deserializer deserializer) { this.deserializer = deserializer; } + @Override + public void setIfUnset(final Deserializer defaultDeserializer) { + if (deserializer == null) { + deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null"); + } + } + @Override public SubscriptionResponseWrapper deserialize(final String topic, final byte[] data) { //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java index 9cbeaddc78f..61fb1c13347 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java @@ -68,6 +68,8 @@ public class SubscriptionStoreReceiveProcessorSupplier internalProcessorContext.metrics() ); store = internalProcessorContext.getStateStore(storeBuilder); + + keySchema.init(context); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index ae53ba8b349..1cb32935fbe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import java.nio.ByteBuffer; +import java.util.Objects; public class SubscriptionWrapperSerde implements Serde> { private final SubscriptionWrapperSerializer serializer; private final SubscriptionWrapperDeserializer deserializer; public SubscriptionWrapperSerde(final Serde primaryKeySerde) { - serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde.serializer()); - deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde.deserializer()); + serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde == null ? null : primaryKeySerde.serializer()); + deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde == null ? null : primaryKeySerde.deserializer()); } @Override @@ -42,12 +45,22 @@ public class SubscriptionWrapperSerde implements Serde return deserializer; } - private static class SubscriptionWrapperSerializer implements Serializer> { - private final Serializer primaryKeySerializer; + public static class SubscriptionWrapperSerializer + implements Serializer>, WrappingNullableSerializer, K> { + + private Serializer primaryKeySerializer; + SubscriptionWrapperSerializer(final Serializer primaryKeySerializer) { this.primaryKeySerializer = primaryKeySerializer; } + @Override + public void setIfUnset(final Serializer defaultSerializer) { + if (primaryKeySerializer == null) { + primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null"); + } + } + @Override public byte[] serialize(final String topic, final SubscriptionWrapper data) { //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized} @@ -81,12 +94,22 @@ public class SubscriptionWrapperSerde implements Serde } - private static class SubscriptionWrapperDeserializer implements Deserializer> { - private final Deserializer primaryKeyDeserializer; + public static class SubscriptionWrapperDeserializer + implements Deserializer>, WrappingNullableDeserializer, K> { + + private Deserializer primaryKeyDeserializer; + SubscriptionWrapperDeserializer(final Deserializer primaryKeyDeserializer) { this.primaryKeyDeserializer = primaryKeyDeserializer; } + @Override + public void setIfUnset(final Deserializer defaultDeserializer) { + if (primaryKeyDeserializer == null) { + primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null"); + } + } + @Override public SubscriptionWrapper deserialize(final String topic, final byte[] data) { //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index b94291b3503..e3333be03e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.kstream.internals.ChangedSerializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; @@ -66,10 +66,10 @@ public class SinkNode extends ProcessorNode { valSerializer = (Serializer) context.valueSerde().serializer(); } - // if value serializers are for {@code Change} values, set the inner serializer when necessary - if (valSerializer instanceof ChangedSerializer && - ((ChangedSerializer) valSerializer).inner() == null) { - ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer()); + // if value serializers are internal wrapping serializers that may need to be given the default serializer + // then pass it the default one from the context + if (valSerializer instanceof WrappingNullableSerializer) { + ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 33d08b1171f..853520a5e27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.streams.kstream.internals.ChangedDeserializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; @@ -88,10 +88,10 @@ public class SourceNode extends ProcessorNode { this.valDeserializer = (Deserializer) context.valueSerde().deserializer(); } - // if value deserializers are for {@code Change} values, set the inner deserializer when necessary - if (this.valDeserializer instanceof ChangedDeserializer && - ((ChangedDeserializer) this.valDeserializer).inner() == null) { - ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer()); + // if value deserializers are internal wrapping deserializers that may need to be given the default + // then pass it the default one from the context + if (valDeserializer instanceof WrappingNullableDeserializer) { + ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java index 47e2d95da63..dbe5bc2981c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java @@ -39,6 +39,7 @@ import org.junit.runners.Suite; KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class, KTableKTableForeignKeyJoinIntegrationTest.class, KTableKTableForeignKeyJoinMaterializationIntegrationTest.class, + KTableKTableForeignKeyJoinDefaultSerdeTest.class, CombinedKeySchemaTest.class, SubscriptionWrapperSerdeTest.class, SubscriptionResponseWrapperSerdeTest.class, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java new file mode 100644 index 00000000000..df6349a8510 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class KTableKTableForeignKeyJoinDefaultSerdeTest { + @Test + public void shouldWorkWithDefaultSerdes() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable aTable = builder.table("A"); + final KTable bTable = builder.table("B"); + + final KTable fkJoinResult = aTable.join( + bTable, + value -> value.split("-")[0], + (aVal, bVal) -> "(" + aVal + "," + bVal + ")", + Materialized.as("asdf") + ); + + final KTable finalJoinResult = aTable.join( + fkJoinResult, + (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")" + ); + + finalJoinResult.toStream().to("output"); + + validateTopologyCanProcessData(builder); + } + + @Test + public void shouldWorkWithDefaultAndConsumedSerdes() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable aTable = builder.table("A", Consumed.with(Serdes.String(), Serdes.String())); + final KTable bTable = builder.table("B"); + + final KTable fkJoinResult = aTable.join( + bTable, + value -> value.split("-")[0], + (aVal, bVal) -> "(" + aVal + "," + bVal + ")", + Materialized.as("asdf") + ); + + final KTable finalJoinResult = aTable.join( + fkJoinResult, + (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")" + ); + + finalJoinResult.toStream().to("output"); + + validateTopologyCanProcessData(builder); + } + + @Test + public void shouldWorkWithDefaultAndJoinResultSerdes() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable aTable = builder.table("A"); + final KTable bTable = builder.table("B"); + + final KTable fkJoinResult = aTable.join( + bTable, + value -> value.split("-")[0], + (aVal, bVal) -> "(" + aVal + "," + bVal + ")", + Materialized + .>as("asdf") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + ); + + final KTable finalJoinResult = aTable.join( + fkJoinResult, + (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")" + ); + + finalJoinResult.toStream().to("output"); + + validateTopologyCanProcessData(builder); + } + + @Test + public void shouldWorkWithDefaultAndEquiJoinResultSerdes() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable aTable = builder.table("A"); + final KTable bTable = builder.table("B"); + + final KTable fkJoinResult = aTable.join( + bTable, + value -> value.split("-")[0], + (aVal, bVal) -> "(" + aVal + "," + bVal + ")", + Materialized.as("asdf") + ); + + final KTable finalJoinResult = aTable.join( + fkJoinResult, + (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")", + Materialized.with(Serdes.String(), Serdes.String()) + ); + + finalJoinResult.toStream().to("output"); + + validateTopologyCanProcessData(builder); + } + + @Test + public void shouldWorkWithDefaultAndProducedSerdes() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable aTable = builder.table("A"); + final KTable bTable = builder.table("B"); + + final KTable fkJoinResult = aTable.join( + bTable, + value -> value.split("-")[0], + (aVal, bVal) -> "(" + aVal + "," + bVal + ")", + Materialized.as("asdf") + ); + + final KTable finalJoinResult = aTable.join( + fkJoinResult, + (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")" + ); + + finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String())); + + validateTopologyCanProcessData(builder); + } + + private static void validateTopologyCanProcessData(final StreamsBuilder builder) { + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) { + final TestInputTopic aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer()); + final TestInputTopic bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); + aTopic.pipeInput("a1", "b1-alpha"); + bTopic.pipeInput("b1", "beta"); + final Map x = output.readKeyValuesToMap(); + assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))"))); + } + } +}