From e16859dc48c679b3c7d9735438df046479b8ec4a Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 11 Feb 2020 17:34:00 -0600 Subject: [PATCH] KAFKA-9390: Make serde pseudo-topics unique (#8054) During the discussion for KIP-213, we decided to pass "pseudo-topics" to the internal serdes we use to construct the wrapper serdes for CombinedKey and hashing the left-hand-side value. However, during the implementation, this strategy wasn't fully implemented, and we wound up using the same topic name for a few different data types. Reviewers: Guozhang Wang --- .../streams/kstream/internals/KTableImpl.java | 23 ++- .../foreignkeyjoin/CombinedKeySchema.java | 23 ++- ...JoinSubscriptionSendProcessorSupplier.java | 17 +- .../SubscriptionWrapperSerde.java | 35 +++-- ...reignKeyInnerJoinMultiIntegrationTest.java | 32 +++- ...eKTableForeignKeyJoinDefaultSerdeTest.java | 5 +- ...leKTableForeignKeyJoinIntegrationTest.java | 16 +- .../foreignkeyjoin/CombinedKeySchemaTest.java | 18 ++- .../SubscriptionWrapperSerdeTest.java | 8 +- .../streams/utils/UniqueTopicSerdeScope.java | 148 ++++++++++++++++++ 10 files changed, 272 insertions(+), 53 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index f5c5aece072..bc14eee78a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -953,23 +953,34 @@ public class KTableImpl extends AbstractStream implements KTable< //This occurs whenever the extracted foreignKey changes values. enableSendingOldValues(); - final Serde foreignKeySerde = ((KTableImpl) foreignKeyTable).keySerde; - final Serde> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(keySerde); - final SubscriptionResponseWrapperSerde responseWrapperSerde = - new SubscriptionResponseWrapperSerde<>(((KTableImpl) foreignKeyTable).valSerde); final NamedInternal renamed = new NamedInternal(joinName); final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX; + final String subscriptionPrimaryKeySerdePseudoTopic = subscriptionTopicName + "-pk"; + final String subscriptionForeignKeySerdePseudoTopic = subscriptionTopicName + "-fk"; + final String valueHashSerdePseudoTopic = subscriptionTopicName + "-vh"; builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName); - final CombinedKeySchema combinedKeySchema = new CombinedKeySchema<>(subscriptionTopicName, foreignKeySerde, keySerde); + + final Serde foreignKeySerde = ((KTableImpl) foreignKeyTable).keySerde; + final Serde> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde); + final SubscriptionResponseWrapperSerde responseWrapperSerde = + new SubscriptionResponseWrapperSerde<>(((KTableImpl) foreignKeyTable).valSerde); + + final CombinedKeySchema combinedKeySchema = new CombinedKeySchema<>( + subscriptionForeignKeySerdePseudoTopic, + foreignKeySerde, + subscriptionPrimaryKeySerdePseudoTopic, + keySerde + ); final ProcessorGraphNode> subscriptionNode = new ProcessorGraphNode<>( new ProcessorParameters<>( new ForeignJoinSubscriptionSendProcessorSupplier<>( foreignKeyExtractor, + subscriptionForeignKeySerdePseudoTopic, + valueHashSerdePseudoTopic, foreignKeySerde, - subscriptionTopicName, valSerde == null ? null : valSerde.serializer(), leftJoin ), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java index 7cda404e01d..92fb72c7ae6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java @@ -28,14 +28,19 @@ import java.nio.ByteBuffer; * Factory for creating CombinedKey serializers / deserializers. */ public class CombinedKeySchema { - private final String serdeTopic; + private final String primaryKeySerdeTopic; + private final String foreignKeySerdeTopic; private Serializer primaryKeySerializer; private Deserializer primaryKeyDeserializer; private Serializer foreignKeySerializer; private Deserializer foreignKeyDeserializer; - public CombinedKeySchema(final String serdeTopic, final Serde foreignKeySerde, final Serde primaryKeySerde) { - this.serdeTopic = serdeTopic; + public CombinedKeySchema(final String foreignKeySerdeTopic, + final Serde foreignKeySerde, + final String primaryKeySerdeTopic, + final Serde primaryKeySerde) { + this.primaryKeySerdeTopic = primaryKeySerdeTopic; + this.foreignKeySerdeTopic = foreignKeySerdeTopic; primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer(); primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer(); foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer(); @@ -54,10 +59,12 @@ public class CombinedKeySchema { //The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan //key is being created. //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} - final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, foreignKey); + final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(foreignKeySerdeTopic, + foreignKey); //? bytes - final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(serdeTopic, primaryKey); + final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(primaryKeySerdeTopic, + primaryKey); final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length + primaryKeySerializedData.length); buf.putInt(foreignKeySerializedData.length); @@ -74,11 +81,11 @@ public class CombinedKeySchema { final int foreignKeyLength = dataBuffer.getInt(); final byte[] foreignKeyRaw = new byte[foreignKeyLength]; dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength); - final KO foreignKey = foreignKeyDeserializer.deserialize(serdeTopic, foreignKeyRaw); + final KO foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw); final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES]; dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length); - final K primaryKey = primaryKeyDeserializer.deserialize(serdeTopic, primaryKeyRaw); + final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw); return new CombinedKey<>(foreignKey, primaryKey); } @@ -86,7 +93,7 @@ public class CombinedKeySchema { //The serialization format. Note that primaryKeySerialized is not required/used in this function. //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} - final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, key); + final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(foreignKeySerdeTopic, key); final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length); buf.putInt(foreignKeySerializedData.length); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java index 06bd7d25ea5..ba794f7a972 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java @@ -43,20 +43,23 @@ public class ForeignJoinSubscriptionSendProcessorSupplier implements P private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class); private final Function foreignKeyExtractor; - private final String repartitionTopicName; + private final String foreignKeySerdeTopic; + private final String valueSerdeTopic; private final boolean leftJoin; private Serializer foreignKeySerializer; private Serializer valueSerializer; public ForeignJoinSubscriptionSendProcessorSupplier(final Function foreignKeyExtractor, + final String foreignKeySerdeTopic, + final String valueSerdeTopic, final Serde foreignKeySerde, - final String repartitionTopicName, final Serializer valueSerializer, final boolean leftJoin) { this.foreignKeyExtractor = foreignKeyExtractor; + this.foreignKeySerdeTopic = foreignKeySerdeTopic; + this.valueSerdeTopic = valueSerdeTopic; this.valueSerializer = valueSerializer; this.leftJoin = leftJoin; - this.repartitionTopicName = repartitionTopicName; foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer(); } @@ -91,7 +94,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier implements P public void process(final K key, final Change change) { final long[] currentHash = change.newValue == null ? null : - Murmur3.hash128(valueSerializer.serialize(repartitionTopicName, change.newValue)); + Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, change.newValue)); if (change.oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue); @@ -114,8 +117,10 @@ public class ForeignJoinSubscriptionSendProcessorSupplier implements P return; } - final byte[] serialOldForeignKey = foreignKeySerializer.serialize(repartitionTopicName, oldForeignKey); - final byte[] serialNewForeignKey = foreignKeySerializer.serialize(repartitionTopicName, newForeignKey); + final byte[] serialOldForeignKey = + foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey); + final byte[] serialNewForeignKey = + foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey); if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { //Different Foreign Key - delete the old key value and propagate the new one. //Delete it from the oldKey's state store diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index 1cb32935fbe..42aed940ef4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -30,9 +30,12 @@ public class SubscriptionWrapperSerde implements Serde private final SubscriptionWrapperSerializer serializer; private final SubscriptionWrapperDeserializer deserializer; - public SubscriptionWrapperSerde(final Serde primaryKeySerde) { - serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde == null ? null : primaryKeySerde.serializer()); - deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde == null ? null : primaryKeySerde.deserializer()); + public SubscriptionWrapperSerde(final String primaryKeySerializationPseudoTopic, + final Serde primaryKeySerde) { + serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopic, + primaryKeySerde == null ? null : primaryKeySerde.serializer()); + deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopic, + primaryKeySerde == null ? null : primaryKeySerde.deserializer()); } @Override @@ -45,12 +48,15 @@ public class SubscriptionWrapperSerde implements Serde return deserializer; } - public static class SubscriptionWrapperSerializer + private static class SubscriptionWrapperSerializer implements Serializer>, WrappingNullableSerializer, K> { + private final String primaryKeySerializationPseudoTopic; private Serializer primaryKeySerializer; - SubscriptionWrapperSerializer(final Serializer primaryKeySerializer) { + SubscriptionWrapperSerializer(final String primaryKeySerializationPseudoTopic, + final Serializer primaryKeySerializer) { + this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic; this.primaryKeySerializer = primaryKeySerializer; } @@ -62,7 +68,7 @@ public class SubscriptionWrapperSerde implements Serde } @Override - public byte[] serialize(final String topic, final SubscriptionWrapper data) { + public byte[] serialize(final String ignored, final SubscriptionWrapper data) { //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized} //7-bit (0x7F) maximum for data version. @@ -70,7 +76,10 @@ public class SubscriptionWrapperSerde implements Serde throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F"); } - final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(topic, data.getPrimaryKey()); + final byte[] primaryKeySerializedData = primaryKeySerializer.serialize( + primaryKeySerializationPseudoTopic, + data.getPrimaryKey() + ); final ByteBuffer buf; if (data.getHash() != null) { @@ -94,12 +103,15 @@ public class SubscriptionWrapperSerde implements Serde } - public static class SubscriptionWrapperDeserializer + private static class SubscriptionWrapperDeserializer implements Deserializer>, WrappingNullableDeserializer, K> { + private final String primaryKeySerializationPseudoTopic; private Deserializer primaryKeyDeserializer; - SubscriptionWrapperDeserializer(final Deserializer primaryKeyDeserializer) { + SubscriptionWrapperDeserializer(final String primaryKeySerializationPseudoTopic, + final Deserializer primaryKeyDeserializer) { + this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic; this.primaryKeyDeserializer = primaryKeyDeserializer; } @@ -111,7 +123,7 @@ public class SubscriptionWrapperSerde implements Serde } @Override - public SubscriptionWrapper deserialize(final String topic, final byte[] data) { + public SubscriptionWrapper deserialize(final String ignored, final byte[] data) { //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized} final ByteBuffer buf = ByteBuffer.wrap(data); final byte versionAndIsHashNull = buf.get(); @@ -132,7 +144,8 @@ public class SubscriptionWrapperSerde implements Serde final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk buf.get(primaryKeyRaw, 0, primaryKeyRaw.length); - final K primaryKey = primaryKeyDeserializer.deserialize(topic, primaryKeyRaw); + final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic, + primaryKeyRaw); return new SubscriptionWrapper<>(hash, inst, primaryKey, version); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java index ad746d8f611..c7cb7120e07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -206,17 +207,30 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { } private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) { + final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final StreamsBuilder builder = new StreamsBuilder(); - final KTable table1 = builder.table(TABLE_1, Consumed.with(Serdes.Integer(), Serdes.Float())); - final KTable table2 = builder.table(TABLE_2, Consumed.with(Serdes.String(), Serdes.Long())); - final KTable table3 = builder.table(TABLE_3, Consumed.with(Serdes.Integer(), Serdes.String())); + final KTable table1 = builder.table( + TABLE_1, + Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.Float(), streamsConfig, false)) + ); + final KTable table2 = builder.table( + TABLE_2, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.Long(), streamsConfig, false)) + ); + final KTable table3 = builder.table( + TABLE_3, + Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) + ); final Materialized> materialized; if (queryableName != null) { materialized = Materialized.>as(queryableName) - .withKeySerde(Serdes.Integer()) - .withValueSerde(Serdes.String()) + .withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true)) + .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) .withCachingDisabled(); } else { throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store"); @@ -225,8 +239,8 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { final Materialized> materializedTwo; if (queryableNameTwo != null) { materializedTwo = Materialized.>as(queryableNameTwo) - .withKeySerde(Serdes.Integer()) - .withValueSerde(Serdes.String()) + .withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true)) + .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) .withCachingDisabled(); } else { throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store"); @@ -247,7 +261,9 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { table1.join(table2, tableOneKeyExtractor, joiner, materialized) .join(table3, joinedTableKeyExtractor, joinerTwo, materializedTwo) .toStream() - .to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String())); + .to(OUTPUT, + Produced.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))); return new KafkaStreams(builder.build(streamsConfig), streamsConfig); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java index df6349a8510..5af0530ca53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java @@ -30,11 +30,13 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.TestUtils; import org.junit.Test; import java.util.Collections; import java.util.Map; import java.util.Properties; +import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -161,10 +163,11 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest { private static void validateTopologyCanProcessData(final StreamsBuilder builder) { final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + UUID.randomUUID()); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) { final TestInputTopic aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer()); final TestInputTopic bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 746d6b3195b..db0e2de1384 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; import org.apache.kafka.test.TestUtils; import org.junit.Test; import org.junit.runner.RunWith; @@ -506,17 +507,26 @@ public class KTableKTableForeignKeyJoinIntegrationTest { private static Topology getTopology(final Properties streamsConfig, final String queryableStoreName, final boolean leftJoin) { + final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final StreamsBuilder builder = new StreamsBuilder(); - final KTable left = builder.table(LEFT_TABLE, Consumed.with(Serdes.String(), Serdes.String())); - final KTable right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(), Serdes.String())); + final KTable left = builder.table( + LEFT_TABLE, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) + ); + final KTable right = builder.table( + RIGHT_TABLE, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) + ); final Function extractor = value -> value.split("\\|")[1]; final ValueJoiner joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")"; final Materialized> materialized = Materialized.as(Stores.inMemoryKeyValueStore(queryableStoreName)) - .withValueSerde(Serdes.String()) + .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) // the cache suppresses some of the unnecessary tombstones we want to make assertions about .withCachingDisabled(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java index 47348038521..cb1ef596db3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java @@ -28,7 +28,8 @@ public class CombinedKeySchemaTest { @Test public void nonNullPrimaryKeySerdeTest() { - final CombinedKeySchema cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); + final CombinedKeySchema cks = new CombinedKeySchema<>("fkTopic", Serdes.String(), + "pkTopic", Serdes.Integer()); final Integer primary = -999; final Bytes result = cks.toBytes("foreignKey", primary); @@ -39,21 +40,25 @@ public class CombinedKeySchemaTest { @Test(expected = NullPointerException.class) public void nullPrimaryKeySerdeTest() { - final CombinedKeySchema cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); + final CombinedKeySchema cks = new CombinedKeySchema<>("fkTopic", Serdes.String(), + "pkTopic", Serdes.Integer()); cks.toBytes("foreignKey", null); } @Test(expected = NullPointerException.class) public void nullForeignKeySerdeTest() { - final CombinedKeySchema cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); + final CombinedKeySchema cks = new CombinedKeySchema<>("fkTopic", Serdes.String(), + "pkTopic", Serdes.Integer()); cks.toBytes(null, 10); } @Test public void prefixKeySerdeTest() { - final CombinedKeySchema cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); + final CombinedKeySchema cks = new CombinedKeySchema<>("fkTopic", Serdes.String(), + "pkTopic", Serdes.Integer()); final String foreignKey = "someForeignKey"; - final byte[] foreignKeySerializedData = Serdes.String().serializer().serialize("someTopic", foreignKey); + final byte[] foreignKeySerializedData = + Serdes.String().serializer().serialize("fkTopic", foreignKey); final Bytes prefix = cks.prefixBytes(foreignKey); final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length); @@ -66,7 +71,8 @@ public class CombinedKeySchemaTest { @Test(expected = NullPointerException.class) public void nullPrefixKeySerdeTest() { - final CombinedKeySchema cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); + final CombinedKeySchema cks = new CombinedKeySchema<>("fkTopic", Serdes.String(), + "pkTopic", Serdes.Integer()); final String foreignKey = null; cks.prefixBytes(foreignKey); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java index d948d1f2a35..5c6551c3e7d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java @@ -31,7 +31,7 @@ public class SubscriptionWrapperSerdeTest { @SuppressWarnings("unchecked") public void shouldSerdeTest() { final String originalKey = "originalKey"; - final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); + final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String()); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey); final byte[] serialized = swSerde.serializer().serialize(null, wrapper); @@ -46,7 +46,7 @@ public class SubscriptionWrapperSerdeTest { @SuppressWarnings("unchecked") public void shouldSerdeNullHashTest() { final String originalKey = "originalKey"; - final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); + final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String()); final long[] hashedValue = null; final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey); final byte[] serialized = swSerde.serializer().serialize(null, wrapper); @@ -61,7 +61,7 @@ public class SubscriptionWrapperSerdeTest { @SuppressWarnings("unchecked") public void shouldThrowExceptionOnNullKeyTest() { final String originalKey = null; - final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); + final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String()); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey); swSerde.serializer().serialize(null, wrapper); @@ -71,7 +71,7 @@ public class SubscriptionWrapperSerdeTest { @SuppressWarnings("unchecked") public void shouldThrowExceptionOnNullInstructionTest() { final String originalKey = "originalKey"; - final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); + final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String()); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey); swSerde.serializer().serialize(null, wrapper); diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java new file mode 100644 index 00000000000..04b1a7b2a78 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.utils; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class UniqueTopicSerdeScope { + private final Map> topicTypeRegistry = new TreeMap<>(); + + public UniqueTopicSerdeDecorator decorateSerde(final Serde delegate, + final Properties config, + final boolean isKey) { + final UniqueTopicSerdeDecorator decorator = new UniqueTopicSerdeDecorator<>(delegate); + decorator.configure(config.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)), isKey); + return decorator; + } + + public class UniqueTopicSerdeDecorator implements Serde { + private final AtomicBoolean isKey = new AtomicBoolean(false); + private final Serde delegate; + + public UniqueTopicSerdeDecorator(final Serde delegate) { + this.delegate = delegate; + } + + @Override + public void configure(final Map configs, final boolean isKey) { + delegate.configure(configs, isKey); + this.isKey.set(isKey); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public Serializer serializer() { + return new UniqueTopicSerializerDecorator<>(isKey, delegate.serializer()); + } + + @Override + public Deserializer deserializer() { + return new UniqueTopicDeserializerDecorator<>(isKey, delegate.deserializer()); + } + } + + public class UniqueTopicSerializerDecorator implements Serializer { + private final AtomicBoolean isKey; + private final Serializer delegate; + + public UniqueTopicSerializerDecorator(final AtomicBoolean isKey, final Serializer delegate) { + this.isKey = isKey; + this.delegate = delegate; + } + + @Override + public void configure(final Map configs, final boolean isKey) { + delegate.configure(configs, isKey); + this.isKey.set(isKey); + } + + @Override + public byte[] serialize(final String topic, final T data) { + verifyTopic(topic, data); + return delegate.serialize(topic, data); + } + + @Override + public byte[] serialize(final String topic, final Headers headers, final T data) { + verifyTopic(topic, data); + return delegate.serialize(topic, headers, data); + } + + private void verifyTopic(final String topic, final T data) { + if (data != null) { + final String key = topic + (isKey.get() ? "--key" : "--value"); + if (topicTypeRegistry.containsKey(key)) { + assertThat(String.format("key[%s] data[%s][%s]", key, data, data.getClass()), topicTypeRegistry.get(key), equalTo(data.getClass())); + } else { + topicTypeRegistry.put(key, data.getClass()); + } + } + } + + @Override + public void close() { + delegate.close(); + } + } + + public class UniqueTopicDeserializerDecorator implements Deserializer { + private final AtomicBoolean isKey; + private final Deserializer delegate; + + public UniqueTopicDeserializerDecorator(final AtomicBoolean isKey, final Deserializer delegate) { + this.isKey = isKey; + this.delegate = delegate; + } + + @Override + public void configure(final Map configs, final boolean isKey) { + delegate.configure(configs, isKey); + this.isKey.set(isKey); + } + + @Override + public T deserialize(final String topic, final byte[] data) { + return delegate.deserialize(topic, data); + } + + @Override + public T deserialize(final String topic, final Headers headers, final byte[] data) { + return delegate.deserialize(topic, headers, data); + } + + @Override + public void close() { + delegate.close(); + } + } +}