Browse Source

KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)

Reviewers: Adam Bellemare <adam.bellemare@wishabi.com>, John Roesler <john@confluent.io>
pull/7767/head
Matthias J. Sax 5 years ago committed by GitHub
parent
commit
ba02e8c6b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
  2. 71
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java

4
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java

@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO> @@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(value, context().timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
//If the subscriptionWrapper hash indicates a null, must delete from statestore.
//This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
if (value.getHash() == null) {
if (value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
store.delete(subscriptionKey);
} else {
store.put(subscriptionKey, newValue);

71
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java

@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
@Test
public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
final Topology topology = getTopology(streamsConfig, "store", leftJoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
// Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
// then populate update on RHS
right.pipeInput("rhs1", "rhsValue1");
right.pipeInput("rhs2", "rhsValue2");
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
assertThat(
asMap(store),
is(emptyMap())
);
left.pipeInput("lhs1", "lhsValue1|rhs1");
{
final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
);
assertThat(
outputTopic.readKeyValuesToMap(),
is(expected)
);
assertThat(
asMap(store),
is(expected)
);
}
// Change LHS foreign key reference
left.pipeInput("lhs1", "lhsValue1|rhs2");
{
final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
);
assertThat(
outputTopic.readKeyValuesToMap(),
is(expected)
);
assertThat(
asMap(store),
is(expected)
);
}
// Populate RHS update on old LHS foreign key ref
right.pipeInput("rhs1", "rhsValue1Delta");
{
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
))
);
}
}
}
private static Map<String, String> asMap(final KeyValueStore<String, String> store) {
final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));

Loading…
Cancel
Save