From ba02e8c6b6802262646a7d6287c7a2c237be65fd Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 29 Nov 2019 21:21:06 -0800 Subject: [PATCH] KAFKA-9244: Update FK reference should unsubscribe old FK (#7758) Reviewers: Adam Bellemare , John Roesler --- ...criptionStoreReceiveProcessorSupplier.java | 4 +- ...leKTableForeignKeyJoinIntegrationTest.java | 71 +++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java index 0a86980adef..9cbeaddc78f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java @@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier final ValueAndTimestamp> newValue = ValueAndTimestamp.make(value, context().timestamp()); final ValueAndTimestamp> oldValue = store.get(subscriptionKey); - //If the subscriptionWrapper hash indicates a null, must delete from statestore. //This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier - if (value.getHash() == null) { + if (value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) || + value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) { store.delete(subscriptionKey); } else { store.put(subscriptionKey, newValue); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 80c0f524e4f..746d6b3195b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } } + @Test + public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() { + final Topology topology = getTopology(streamsConfig, "store", leftJoin); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { + final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); + final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); + final KeyValueStore store = driver.getKeyValueStore("store"); + + // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference + // then populate update on RHS + right.pipeInput("rhs1", "rhsValue1"); + right.pipeInput("rhs2", "rhsValue2"); + + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + assertThat( + asMap(store), + is(emptyMap()) + ); + + left.pipeInput("lhs1", "lhsValue1|rhs1"); + { + final Map expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") + ); + assertThat( + outputTopic.readKeyValuesToMap(), + is(expected) + ); + assertThat( + asMap(store), + is(expected) + ); + } + + // Change LHS foreign key reference + left.pipeInput("lhs1", "lhsValue1|rhs2"); + { + final Map expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") + ); + assertThat( + outputTopic.readKeyValuesToMap(), + is(expected) + ); + assertThat( + asMap(store), + is(expected) + ); + } + + // Populate RHS update on old LHS foreign key ref + right.pipeInput("rhs1", "rhsValue1Delta"); + { + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") + )) + ); + } + } + } + private static Map asMap(final KeyValueStore store) { final HashMap result = new HashMap<>(); store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));