Browse Source

KAFKA-9390: Make serde pseudo-topics unique (#8054)

During the discussion for KIP-213, we decided to pass "pseudo-topics"
to the internal serdes we use to construct the wrapper serdes for
CombinedKey and hashing the left-hand-side value. However, during
the implementation, this strategy wasn't fully implemented, and we wound
up using the same topic name for a few different data types.

Reviewers: Guozhang Wang <guozhang@confluent.io>
pull/7731/head
John Roesler 5 years ago committed by GitHub
parent
commit
e16859dc48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  2. 23
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
  3. 17
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
  4. 35
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
  5. 32
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
  6. 5
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
  7. 16
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
  8. 18
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
  9. 8
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
  10. 148
      streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java

23
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -953,23 +953,34 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
//This occurs whenever the extracted foreignKey changes values. //This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues(); enableSendingOldValues();
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(keySerde);
final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valSerde);
final NamedInternal renamed = new NamedInternal(joinName); final NamedInternal renamed = new NamedInternal(joinName);
final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX; final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
final String subscriptionPrimaryKeySerdePseudoTopic = subscriptionTopicName + "-pk";
final String subscriptionForeignKeySerdePseudoTopic = subscriptionTopicName + "-fk";
final String valueHashSerdePseudoTopic = subscriptionTopicName + "-vh";
builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName); builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName);
final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(subscriptionTopicName, foreignKeySerde, keySerde);
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde);
final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valSerde);
final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(
subscriptionForeignKeySerdePseudoTopic,
foreignKeySerde,
subscriptionPrimaryKeySerdePseudoTopic,
keySerde
);
final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>( final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>(
new ProcessorParameters<>( new ProcessorParameters<>(
new ForeignJoinSubscriptionSendProcessorSupplier<>( new ForeignJoinSubscriptionSendProcessorSupplier<>(
foreignKeyExtractor, foreignKeyExtractor,
subscriptionForeignKeySerdePseudoTopic,
valueHashSerdePseudoTopic,
foreignKeySerde, foreignKeySerde,
subscriptionTopicName,
valSerde == null ? null : valSerde.serializer(), valSerde == null ? null : valSerde.serializer(),
leftJoin leftJoin
), ),

23
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java

@ -28,14 +28,19 @@ import java.nio.ByteBuffer;
* Factory for creating CombinedKey serializers / deserializers. * Factory for creating CombinedKey serializers / deserializers.
*/ */
public class CombinedKeySchema<KO, K> { public class CombinedKeySchema<KO, K> {
private final String serdeTopic; private final String primaryKeySerdeTopic;
private final String foreignKeySerdeTopic;
private Serializer<K> primaryKeySerializer; private Serializer<K> primaryKeySerializer;
private Deserializer<K> primaryKeyDeserializer; private Deserializer<K> primaryKeyDeserializer;
private Serializer<KO> foreignKeySerializer; private Serializer<KO> foreignKeySerializer;
private Deserializer<KO> foreignKeyDeserializer; private Deserializer<KO> foreignKeyDeserializer;
public CombinedKeySchema(final String serdeTopic, final Serde<KO> foreignKeySerde, final Serde<K> primaryKeySerde) { public CombinedKeySchema(final String foreignKeySerdeTopic,
this.serdeTopic = serdeTopic; final Serde<KO> foreignKeySerde,
final String primaryKeySerdeTopic,
final Serde<K> primaryKeySerde) {
this.primaryKeySerdeTopic = primaryKeySerdeTopic;
this.foreignKeySerdeTopic = foreignKeySerdeTopic;
primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer(); primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer(); primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer(); foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
@ -54,10 +59,12 @@ public class CombinedKeySchema<KO, K> {
//The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan //The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan
//key is being created. //key is being created.
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, foreignKey); final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(foreignKeySerdeTopic,
foreignKey);
//? bytes //? bytes
final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(serdeTopic, primaryKey); final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(primaryKeySerdeTopic,
primaryKey);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length + primaryKeySerializedData.length); final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length + primaryKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length); buf.putInt(foreignKeySerializedData.length);
@ -74,11 +81,11 @@ public class CombinedKeySchema<KO, K> {
final int foreignKeyLength = dataBuffer.getInt(); final int foreignKeyLength = dataBuffer.getInt();
final byte[] foreignKeyRaw = new byte[foreignKeyLength]; final byte[] foreignKeyRaw = new byte[foreignKeyLength];
dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength); dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
final KO foreignKey = foreignKeyDeserializer.deserialize(serdeTopic, foreignKeyRaw); final KO foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);
final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES]; final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES];
dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length); dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
final K primaryKey = primaryKeyDeserializer.deserialize(serdeTopic, primaryKeyRaw); final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
return new CombinedKey<>(foreignKey, primaryKey); return new CombinedKey<>(foreignKey, primaryKey);
} }
@ -86,7 +93,7 @@ public class CombinedKeySchema<KO, K> {
//The serialization format. Note that primaryKeySerialized is not required/used in this function. //The serialization format. Note that primaryKeySerialized is not required/used in this function.
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, key); final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length); final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length); buf.putInt(foreignKeySerializedData.length);

17
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java

@ -43,20 +43,23 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class); private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
private final Function<V, KO> foreignKeyExtractor; private final Function<V, KO> foreignKeyExtractor;
private final String repartitionTopicName; private final String foreignKeySerdeTopic;
private final String valueSerdeTopic;
private final boolean leftJoin; private final boolean leftJoin;
private Serializer<KO> foreignKeySerializer; private Serializer<KO> foreignKeySerializer;
private Serializer<V> valueSerializer; private Serializer<V> valueSerializer;
public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor, public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
final String foreignKeySerdeTopic,
final String valueSerdeTopic,
final Serde<KO> foreignKeySerde, final Serde<KO> foreignKeySerde,
final String repartitionTopicName,
final Serializer<V> valueSerializer, final Serializer<V> valueSerializer,
final boolean leftJoin) { final boolean leftJoin) {
this.foreignKeyExtractor = foreignKeyExtractor; this.foreignKeyExtractor = foreignKeyExtractor;
this.foreignKeySerdeTopic = foreignKeySerdeTopic;
this.valueSerdeTopic = valueSerdeTopic;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
this.leftJoin = leftJoin; this.leftJoin = leftJoin;
this.repartitionTopicName = repartitionTopicName;
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer(); foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
} }
@ -91,7 +94,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
public void process(final K key, final Change<V> change) { public void process(final K key, final Change<V> change) {
final long[] currentHash = change.newValue == null ? final long[] currentHash = change.newValue == null ?
null : null :
Murmur3.hash128(valueSerializer.serialize(repartitionTopicName, change.newValue)); Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, change.newValue));
if (change.oldValue != null) { if (change.oldValue != null) {
final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue); final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue);
@ -114,8 +117,10 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
return; return;
} }
final byte[] serialOldForeignKey = foreignKeySerializer.serialize(repartitionTopicName, oldForeignKey); final byte[] serialOldForeignKey =
final byte[] serialNewForeignKey = foreignKeySerializer.serialize(repartitionTopicName, newForeignKey); foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey);
final byte[] serialNewForeignKey =
foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey);
if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) {
//Different Foreign Key - delete the old key value and propagate the new one. //Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store //Delete it from the oldKey's state store

35
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java

@ -30,9 +30,12 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
private final SubscriptionWrapperSerializer<K> serializer; private final SubscriptionWrapperSerializer<K> serializer;
private final SubscriptionWrapperDeserializer<K> deserializer; private final SubscriptionWrapperDeserializer<K> deserializer;
public SubscriptionWrapperSerde(final Serde<K> primaryKeySerde) { public SubscriptionWrapperSerde(final String primaryKeySerializationPseudoTopic,
serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde == null ? null : primaryKeySerde.serializer()); final Serde<K> primaryKeySerde) {
deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde == null ? null : primaryKeySerde.deserializer()); serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopic,
primaryKeySerde == null ? null : primaryKeySerde.serializer());
deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopic,
primaryKeySerde == null ? null : primaryKeySerde.deserializer());
} }
@Override @Override
@ -45,12 +48,15 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
return deserializer; return deserializer;
} }
public static class SubscriptionWrapperSerializer<K> private static class SubscriptionWrapperSerializer<K>
implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> { implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
private final String primaryKeySerializationPseudoTopic;
private Serializer<K> primaryKeySerializer; private Serializer<K> primaryKeySerializer;
SubscriptionWrapperSerializer(final Serializer<K> primaryKeySerializer) { SubscriptionWrapperSerializer(final String primaryKeySerializationPseudoTopic,
final Serializer<K> primaryKeySerializer) {
this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
this.primaryKeySerializer = primaryKeySerializer; this.primaryKeySerializer = primaryKeySerializer;
} }
@ -62,7 +68,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
} }
@Override @Override
public byte[] serialize(final String topic, final SubscriptionWrapper<K> data) { public byte[] serialize(final String ignored, final SubscriptionWrapper<K> data) {
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized} //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
//7-bit (0x7F) maximum for data version. //7-bit (0x7F) maximum for data version.
@ -70,7 +76,10 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F"); throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
} }
final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(topic, data.getPrimaryKey()); final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
primaryKeySerializationPseudoTopic,
data.getPrimaryKey()
);
final ByteBuffer buf; final ByteBuffer buf;
if (data.getHash() != null) { if (data.getHash() != null) {
@ -94,12 +103,15 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
} }
public static class SubscriptionWrapperDeserializer<K> private static class SubscriptionWrapperDeserializer<K>
implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> { implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
private final String primaryKeySerializationPseudoTopic;
private Deserializer<K> primaryKeyDeserializer; private Deserializer<K> primaryKeyDeserializer;
SubscriptionWrapperDeserializer(final Deserializer<K> primaryKeyDeserializer) { SubscriptionWrapperDeserializer(final String primaryKeySerializationPseudoTopic,
final Deserializer<K> primaryKeyDeserializer) {
this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
this.primaryKeyDeserializer = primaryKeyDeserializer; this.primaryKeyDeserializer = primaryKeyDeserializer;
} }
@ -111,7 +123,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
} }
@Override @Override
public SubscriptionWrapper<K> deserialize(final String topic, final byte[] data) { public SubscriptionWrapper<K> deserialize(final String ignored, final byte[] data) {
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized} //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
final ByteBuffer buf = ByteBuffer.wrap(data); final ByteBuffer buf = ByteBuffer.wrap(data);
final byte versionAndIsHashNull = buf.get(); final byte versionAndIsHashNull = buf.get();
@ -132,7 +144,8 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
buf.get(primaryKeyRaw, 0, primaryKeyRaw.length); buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
final K primaryKey = primaryKeyDeserializer.deserialize(topic, primaryKeyRaw); final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
primaryKeyRaw);
return new SubscriptionWrapper<>(hash, inst, primaryKey, version); return new SubscriptionWrapper<>(hash, inst, primaryKey, version);
} }

32
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java

@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.After; import org.junit.After;
@ -206,17 +207,30 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
} }
private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) { private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, Float> table1 = builder.table(TABLE_1, Consumed.with(Serdes.Integer(), Serdes.Float())); final KTable<Integer, Float> table1 = builder.table(
final KTable<String, Long> table2 = builder.table(TABLE_2, Consumed.with(Serdes.String(), Serdes.Long())); TABLE_1,
final KTable<Integer, String> table3 = builder.table(TABLE_3, Consumed.with(Serdes.Integer(), Serdes.String())); Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.Float(), streamsConfig, false))
);
final KTable<String, Long> table2 = builder.table(
TABLE_2,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.Long(), streamsConfig, false))
);
final KTable<Integer, String> table3 = builder.table(
TABLE_3,
Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized; final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableName != null) { if (queryableName != null) {
materialized = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableName) materialized = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
.withKeySerde(Serdes.Integer()) .withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true))
.withValueSerde(Serdes.String()) .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled(); .withCachingDisabled();
} else { } else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store"); throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
@ -225,8 +239,8 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materializedTwo; final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materializedTwo;
if (queryableNameTwo != null) { if (queryableNameTwo != null) {
materializedTwo = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableNameTwo) materializedTwo = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableNameTwo)
.withKeySerde(Serdes.Integer()) .withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true))
.withValueSerde(Serdes.String()) .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled(); .withCachingDisabled();
} else { } else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store"); throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
@ -247,7 +261,9 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
table1.join(table2, tableOneKeyExtractor, joiner, materialized) table1.join(table2, tableOneKeyExtractor, joiner, materialized)
.join(table3, joinedTableKeyExtractor, joinerTwo, materializedTwo) .join(table3, joinedTableKeyExtractor, joinerTwo, materializedTwo)
.toStream() .toStream()
.to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String())); .to(OUTPUT,
Produced.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
return new KafkaStreams(builder.build(streamsConfig), streamsConfig); return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
} }

5
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java

@ -30,11 +30,13 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.TestUtils;
import org.junit.Test; import org.junit.Test;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.UUID;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -161,10 +163,11 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
private static void validateTopologyCanProcessData(final StreamsBuilder builder) { private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + UUID.randomUUID());
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) { try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) {
final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer());

16
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java

@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -506,17 +507,26 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
private static Topology getTopology(final Properties streamsConfig, private static Topology getTopology(final Properties streamsConfig,
final String queryableStoreName, final String queryableStoreName,
final boolean leftJoin) { final boolean leftJoin) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> left = builder.table(LEFT_TABLE, Consumed.with(Serdes.String(), Serdes.String())); final KTable<String, String> left = builder.table(
final KTable<String, String> right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(), Serdes.String())); LEFT_TABLE,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
final KTable<String, String> right = builder.table(
RIGHT_TABLE,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
final Function<String, String> extractor = value -> value.split("\\|")[1]; final Function<String, String> extractor = value -> value.split("\\|")[1];
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")"; final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName)) Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
.withValueSerde(Serdes.String()) .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
// the cache suppresses some of the unnecessary tombstones we want to make assertions about // the cache suppresses some of the unnecessary tombstones we want to make assertions about
.withCachingDisabled(); .withCachingDisabled();

18
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java

@ -28,7 +28,8 @@ public class CombinedKeySchemaTest {
@Test @Test
public void nonNullPrimaryKeySerdeTest() { public void nonNullPrimaryKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
"pkTopic", Serdes.Integer());
final Integer primary = -999; final Integer primary = -999;
final Bytes result = cks.toBytes("foreignKey", primary); final Bytes result = cks.toBytes("foreignKey", primary);
@ -39,21 +40,25 @@ public class CombinedKeySchemaTest {
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void nullPrimaryKeySerdeTest() { public void nullPrimaryKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
"pkTopic", Serdes.Integer());
cks.toBytes("foreignKey", null); cks.toBytes("foreignKey", null);
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void nullForeignKeySerdeTest() { public void nullForeignKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
"pkTopic", Serdes.Integer());
cks.toBytes(null, 10); cks.toBytes(null, 10);
} }
@Test @Test
public void prefixKeySerdeTest() { public void prefixKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
"pkTopic", Serdes.Integer());
final String foreignKey = "someForeignKey"; final String foreignKey = "someForeignKey";
final byte[] foreignKeySerializedData = Serdes.String().serializer().serialize("someTopic", foreignKey); final byte[] foreignKeySerializedData =
Serdes.String().serializer().serialize("fkTopic", foreignKey);
final Bytes prefix = cks.prefixBytes(foreignKey); final Bytes prefix = cks.prefixBytes(foreignKey);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length); final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length);
@ -66,7 +71,8 @@ public class CombinedKeySchemaTest {
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void nullPrefixKeySerdeTest() { public void nullPrefixKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer()); final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
"pkTopic", Serdes.Integer());
final String foreignKey = null; final String foreignKey = null;
cks.prefixBytes(foreignKey); cks.prefixBytes(foreignKey);
} }

8
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java

@ -31,7 +31,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void shouldSerdeTest() { public void shouldSerdeTest() {
final String originalKey = "originalKey"; final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
final byte[] serialized = swSerde.serializer().serialize(null, wrapper); final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@ -46,7 +46,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void shouldSerdeNullHashTest() { public void shouldSerdeNullHashTest() {
final String originalKey = "originalKey"; final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
final long[] hashedValue = null; final long[] hashedValue = null;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
final byte[] serialized = swSerde.serializer().serialize(null, wrapper); final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@ -61,7 +61,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullKeyTest() { public void shouldThrowExceptionOnNullKeyTest() {
final String originalKey = null; final String originalKey = null;
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
swSerde.serializer().serialize(null, wrapper); swSerde.serializer().serialize(null, wrapper);
@ -71,7 +71,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullInstructionTest() { public void shouldThrowExceptionOnNullInstructionTest() {
final String originalKey = "originalKey"; final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String()); final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey); final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey);
swSerde.serializer().serialize(null, wrapper); swSerde.serializer().serialize(null, wrapper);

148
streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.utils;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class UniqueTopicSerdeScope {
private final Map<String, Class<?>> topicTypeRegistry = new TreeMap<>();
public <T> UniqueTopicSerdeDecorator<T> decorateSerde(final Serde<T> delegate,
final Properties config,
final boolean isKey) {
final UniqueTopicSerdeDecorator<T> decorator = new UniqueTopicSerdeDecorator<>(delegate);
decorator.configure(config.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)), isKey);
return decorator;
}
public class UniqueTopicSerdeDecorator<T> implements Serde<T> {
private final AtomicBoolean isKey = new AtomicBoolean(false);
private final Serde<T> delegate;
public UniqueTopicSerdeDecorator(final Serde<T> delegate) {
this.delegate = delegate;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
this.isKey.set(isKey);
}
@Override
public void close() {
delegate.close();
}
@Override
public Serializer<T> serializer() {
return new UniqueTopicSerializerDecorator<>(isKey, delegate.serializer());
}
@Override
public Deserializer<T> deserializer() {
return new UniqueTopicDeserializerDecorator<>(isKey, delegate.deserializer());
}
}
public class UniqueTopicSerializerDecorator<T> implements Serializer<T> {
private final AtomicBoolean isKey;
private final Serializer<T> delegate;
public UniqueTopicSerializerDecorator(final AtomicBoolean isKey, final Serializer<T> delegate) {
this.isKey = isKey;
this.delegate = delegate;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
this.isKey.set(isKey);
}
@Override
public byte[] serialize(final String topic, final T data) {
verifyTopic(topic, data);
return delegate.serialize(topic, data);
}
@Override
public byte[] serialize(final String topic, final Headers headers, final T data) {
verifyTopic(topic, data);
return delegate.serialize(topic, headers, data);
}
private void verifyTopic(final String topic, final T data) {
if (data != null) {
final String key = topic + (isKey.get() ? "--key" : "--value");
if (topicTypeRegistry.containsKey(key)) {
assertThat(String.format("key[%s] data[%s][%s]", key, data, data.getClass()), topicTypeRegistry.get(key), equalTo(data.getClass()));
} else {
topicTypeRegistry.put(key, data.getClass());
}
}
}
@Override
public void close() {
delegate.close();
}
}
public class UniqueTopicDeserializerDecorator<T> implements Deserializer<T> {
private final AtomicBoolean isKey;
private final Deserializer<T> delegate;
public UniqueTopicDeserializerDecorator(final AtomicBoolean isKey, final Deserializer<T> delegate) {
this.isKey = isKey;
this.delegate = delegate;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
this.isKey.set(isKey);
}
@Override
public T deserialize(final String topic, final byte[] data) {
return delegate.deserialize(topic, data);
}
@Override
public T deserialize(final String topic, final Headers headers, final byte[] data) {
return delegate.deserialize(topic, headers, data);
}
@Override
public void close() {
delegate.close();
}
}
}
Loading…
Cancel
Save