Browse Source

KAFKA-5226; Fixes issue where adding topics matching a regex

subscribed stream may not be detected by all followers until
onJoinComplete returns.

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3157 from bbejeck/KAFKA-5226_null_pointer_source_node_deserialize
pull/2259/merge
Bill Bejeck 8 years ago committed by Guozhang Wang
parent
commit
6360e04e70
  1. 4
      streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
  2. 33
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
  3. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  4. 6
      streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
  5. 15
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
  6. 91
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

4
streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java

@ -1532,6 +1532,10 @@ public class TopologyBuilder { @@ -1532,6 +1532,10 @@ public class TopologyBuilder {
return applicationId + "-" + topic;
}
public SubscriptionUpdates subscriptionUpdates() {
return subscriptionUpdates;
}
public synchronized Pattern sourceTopicPattern() {
if (this.topicPattern == null) {
final List<String> allSourceTopics = new ArrayList<>();

33
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

@ -253,17 +253,23 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @@ -253,17 +253,23 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
standbyTasks.removeAll(previousActiveTasks);
SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint);
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(), topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
if (streamThread.builder.sourceTopicPattern() != null &&
!streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) {
updateSubscribedTopics(topics);
}
return new Subscription(new ArrayList<>(topics), data.encode());
}
private void updateSubscribedTopics(Set<String> topics) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
log.debug("stream-thread [{}] found {} topics possibly matching regex",
streamThread.getName(), topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
}
/*
* This assigns tasks to consumer clients in the following steps.
*
@ -606,6 +612,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @@ -606,6 +612,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
checkForNewTopicAssignments(assignment);
}
private void checkForNewTopicAssignments(Assignment assignment) {
if (streamThread.builder.sourceTopicPattern() != null) {
final Set<String> assignedTopics = new HashSet<>();
for (final TopicPartition topicPartition : assignment.partitions()) {
assignedTopics.add(topicPartition.topic());
}
if (!streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics)) {
assignedTopics.addAll(streamThread.builder.subscriptionUpdates().getUpdates());
updateSubscribedTopics(assignedTopics);
}
}
}
/**

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -1456,5 +1456,4 @@ public class StreamThread extends Thread { @@ -1456,5 +1456,4 @@ public class StreamThread extends Thread {
return firstException;
}
}

6
streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java

@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager; @@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
@ -681,7 +681,7 @@ public class TopologyBuilderTest { @@ -681,7 +681,7 @@ public class TopologyBuilderTest {
builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
builder.addSource("source-3", Pattern.compile("topic-\\d"));
StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
updatedTopicsField.setAccessible(true);
@ -766,7 +766,7 @@ public class TopologyBuilderTest { @@ -766,7 +766,7 @@ public class TopologyBuilderTest {
.addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
updatedTopicsField.setAccessible(true);

15
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.PartitionInfo; @@ -24,6 +24,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@ -44,6 +45,7 @@ import org.apache.kafka.test.MockProcessorSupplier; @@ -44,6 +45,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@ -104,6 +106,12 @@ public class StreamPartitionAssignorTest { @@ -104,6 +106,12 @@ public class StreamPartitionAssignorTest {
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
private final TopologyBuilder builder = new TopologyBuilder();
private final StreamsConfig config = new StreamsConfig(configProps());
private final StreamThread mockStreamThread = new StreamThread(builder, config,
mockClientSupplier, "appID",
"clientId", UUID.randomUUID(),
new Metrics(), new MockTime(),
null, 1L);
private final Map<String, Object> configurationMap = new HashMap<>();
private Properties configProps() {
return new Properties() {
@ -116,6 +124,13 @@ public class StreamPartitionAssignorTest { @@ -116,6 +124,13 @@ public class StreamPartitionAssignorTest {
};
}
@Before
public void setUp() {
configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, mockStreamThread);
configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
partitionAssignor.configure(configurationMap);
}
@SuppressWarnings("unchecked")
@Test
public void testSubscription() throws Exception {

91
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -36,6 +36,9 @@ import org.apache.kafka.streams.StreamsConfig; @@ -36,6 +36,9 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
@ -52,6 +55,7 @@ import java.io.File; @@ -52,6 +55,7 @@ import java.io.File;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -1499,6 +1503,93 @@ public class StreamThreadTest { @@ -1499,6 +1503,93 @@ public class StreamThreadTest {
assertFalse(testStreamTask.committed);
}
@Test
@SuppressWarnings("unchecked")
public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
final TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("source", Pattern.compile("t.*"));
topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source");
final StreamThread thread = new StreamThread(
topologyBuilder,
config,
clientSupplier,
applicationId,
clientId,
processId,
metrics,
mockTime,
new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0);
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
final Map<String, Object> configurationMap = new HashMap<>();
configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread);
configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
partitionAssignor.configure(configurationMap);
thread.setPartitionAssignor(partitionAssignor);
final Field
nodeToSourceTopicsField =
topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
nodeToSourceTopicsField.setAccessible(true);
final Map<String, List<String>>
nodeToSourceTopics =
(Map<String, List<String>>) nodeToSourceTopicsField.get(topologyBuilder);
final List<TopicPartition> topicPartitions = new ArrayList<>();
final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
final TopicPartition topicPartition2 = new TopicPartition("topic-2", 0);
final TopicPartition topicPartition3 = new TopicPartition("topic-3", 0);
final TaskId taskId1 = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(0, 0);
final TaskId taskId3 = new TaskId(0, 0);
List<TaskId> activeTasks = Arrays.asList(taskId1);
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
topicPartitions.addAll(Arrays.asList(topicPartition1));
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
partitionAssignor.onAssignment(assignment);
assertTrue(nodeToSourceTopics.get("source").size() == 1);
assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
topicPartitions.clear();
activeTasks = Arrays.asList(taskId1, taskId2);
info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2));
assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
partitionAssignor.onAssignment(assignment);
assertTrue(nodeToSourceTopics.get("source").size() == 2);
assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
assertTrue(nodeToSourceTopics.get("source").contains("topic-2"));
topicPartitions.clear();
activeTasks = Arrays.asList(taskId1, taskId2, taskId3);
info = new AssignmentInfo(activeTasks, standbyTasks,
new HashMap<HostInfo, Set<TopicPartition>>());
topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, topicPartition3));
assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
partitionAssignor.onAssignment(assignment);
assertTrue(nodeToSourceTopics.get("source").size() == 3);
assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
assertTrue(nodeToSourceTopics.get("source").contains("topic-2"));
assertTrue(nodeToSourceTopics.get("source").contains("topic-3"));
}
private void initPartitionGrouper(final StreamsConfig config,
final StreamThread thread,
final MockClientSupplier clientSupplier) {

Loading…
Cancel
Save