Browse Source

KAFKA-8240: Fix NPE in Source.equals() (#6589)

Reviewers: John Roesler <john@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/6522/merge
Matthias J. Sax 6 years ago committed by Bill Bejeck
parent
commit
6d3ff132b5
  1. 25
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
  2. 133
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

25
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java

@ -282,7 +282,7 @@ public class InternalTopologyBuilder {
@Override @Override
Source describe() { Source describe() {
return new Source(name, new HashSet<>(topics), pattern); return new Source(name, topics.size() == 0 ? null : new HashSet<>(topics), pattern);
} }
} }
@ -1281,6 +1281,9 @@ public class InternalTopologyBuilder {
@Override @Override
public int compare(final TopologyDescription.Node node1, public int compare(final TopologyDescription.Node node1,
final TopologyDescription.Node node2) { final TopologyDescription.Node node2) {
if (node1.equals(node2)) {
return 0;
}
final int size1 = ((AbstractNode) node1).size; final int size1 = ((AbstractNode) node1).size;
final int size2 = ((AbstractNode) node2).size; final int size2 = ((AbstractNode) node2).size;
@ -1399,6 +1402,7 @@ public class InternalTopologyBuilder {
int size; int size;
AbstractNode(final String name) { AbstractNode(final String name) {
Objects.requireNonNull(name, "name cannot be null");
this.name = name; this.name = name;
this.size = 1; this.size = 1;
} }
@ -1435,6 +1439,13 @@ public class InternalTopologyBuilder {
final Set<String> topics, final Set<String> topics,
final Pattern pattern) { final Pattern pattern) {
super(name); super(name);
if (topics == null && pattern == null) {
throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null.");
}
if (topics != null && pattern != null) {
throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null.");
}
this.topics = topics; this.topics = topics;
this.topicPattern = pattern; this.topicPattern = pattern;
} }
@ -1479,8 +1490,10 @@ public class InternalTopologyBuilder {
final Source source = (Source) o; final Source source = (Source) o;
// omit successor to avoid infinite loops // omit successor to avoid infinite loops
return name.equals(source.name) return name.equals(source.name)
&& topics.equals(source.topics) && (topics == null && source.topics == null
&& topicPattern.equals(source.topicPattern); || topics != null && topics.equals(source.topics))
&& (topicPattern == null && source.topicPattern == null
|| topicPattern != null && topicPattern.pattern().equals(source.topicPattern.pattern()));
} }
@Override @Override
@ -1709,6 +1722,9 @@ public class InternalTopologyBuilder {
@Override @Override
public int compare(final TopologyDescription.GlobalStore globalStore1, public int compare(final TopologyDescription.GlobalStore globalStore1,
final TopologyDescription.GlobalStore globalStore2) { final TopologyDescription.GlobalStore globalStore2) {
if (globalStore1.equals(globalStore2)) {
return 0;
}
return globalStore1.id() - globalStore2.id(); return globalStore1.id() - globalStore2.id();
} }
} }
@ -1719,6 +1735,9 @@ public class InternalTopologyBuilder {
@Override @Override
public int compare(final TopologyDescription.Subtopology subtopology1, public int compare(final TopologyDescription.Subtopology subtopology1,
final TopologyDescription.Subtopology subtopology2) { final TopologyDescription.Subtopology subtopology2) {
if (subtopology1.equals(subtopology2)) {
return 0;
}
return subtopology1.id() - subtopology2.id(); return subtopology1.id() - subtopology2.id();
} }
} }

133
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

@ -23,15 +23,13 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test; import org.junit.Test;
@ -50,12 +48,14 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofSeconds; import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -353,9 +353,9 @@ public class InternalTopologyBuilderTest {
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>(); final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));
assertEquals(3, topicGroups.size()); assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups); assertEquals(expectedTopicGroups, topicGroups);
@ -393,17 +393,17 @@ public class InternalTopologyBuilderTest {
final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2"); final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3"); final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo( expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
Collections.<String, InternalTopicConfig>emptyMap(), Collections.emptyMap(),
Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1, Collections.<String, String>emptyMap())))); Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap()))));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo( expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.emptySet(), mkSet("topic-3", "topic-4"),
Collections.<String, InternalTopicConfig>emptyMap(), Collections.emptyMap(),
Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2, Collections.<String, String>emptyMap())))); Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap()))));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo( expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-5"), Collections.emptySet(), mkSet("topic-5"),
Collections.<String, InternalTopicConfig>emptyMap(), Collections.emptyMap(),
Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3, Collections.<String, String>emptyMap())))); Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap()))));
assertEquals(3, topicGroups.size()); assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups); assertEquals(expectedTopicGroups, topicGroups);
@ -499,12 +499,7 @@ public class InternalTopologyBuilderTest {
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void shouldNotAllowNullNameWhenAddingProcessor() { public void shouldNotAllowNullNameWhenAddingProcessor() {
builder.addProcessor(null, new ProcessorSupplier() { builder.addProcessor(null, () -> null);
@Override
public Processor get() {
return null;
}
});
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
@ -604,14 +599,14 @@ public class InternalTopologyBuilderTest {
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.<String, String>emptyMap(), 10000); final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties1.size()); assertEquals(2, properties1.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("appId-store1-changelog", topicConfig1.name()); assertEquals("appId-store1-changelog", topicConfig1.name());
assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig); assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig);
final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog"); final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
final Map<String, String> properties2 = topicConfig2.getProperties(Collections.<String, String>emptyMap(), 10000); final Map<String, String> properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties2.size()); assertEquals(2, properties2.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
@ -628,7 +623,7 @@ public class InternalTopologyBuilderTest {
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(1, properties.size()); assertEquals(1, properties.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-store-changelog", topicConfig.name()); assertEquals("appId-store-changelog", topicConfig.name());
@ -642,7 +637,7 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source", null, null, null, "foo"); builder.addSource(null, "source", null, null, null, "foo");
final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(3, properties.size()); assertEquals(3, properties.size());
assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
@ -708,32 +703,32 @@ public class InternalTopologyBuilderTest {
assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext());
InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next(); InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("source1")); assertEquals("source1", node.name);
assertEquals(6, node.size); assertEquals(6, node.size);
assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next(); node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("source2")); assertEquals("source2", node.name);
assertEquals(4, node.size); assertEquals(4, node.size);
assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next(); node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("processor2")); assertEquals("processor2", node.name);
assertEquals(3, node.size); assertEquals(3, node.size);
assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next(); node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("processor1")); assertEquals("processor1", node.name);
assertEquals(2, node.size); assertEquals(2, node.size);
assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next(); node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("processor3")); assertEquals("processor3", node.name);
assertEquals(2, node.size); assertEquals(2, node.size);
assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next(); node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("sink1")); assertEquals("sink1", node.name);
assertEquals(1, node.size); assertEquals(1, node.size);
} }
@ -760,7 +755,7 @@ public class InternalTopologyBuilderTest {
final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics();
final List<String> topics = stateStoreAndTopics.get(storeBuilder.name()); final List<String> topics = stateStoreAndTopics.get(storeBuilder.name());
assertTrue("Expected to contain two topics", topics.size() == 2); assertEquals("Expected to contain two topics", 2, topics.size());
assertTrue(topics.contains("topic-2")); assertTrue(topics.contains("topic-2"));
assertTrue(topics.contains("topic-3")); assertTrue(topics.contains("topic-3"));
@ -781,4 +776,74 @@ public class InternalTopologyBuilderTest {
sameNameForSourceAndProcessor, sameNameForSourceAndProcessor,
new MockProcessorSupplier()); new MockProcessorSupplier());
} }
@Test
public void shouldThrowIfNameIsNull() {
final Exception e = assertThrows(NullPointerException.class, () -> new InternalTopologyBuilder.Source(null, Collections.emptySet(), null));
assertEquals("name cannot be null", e.getMessage());
}
@Test
public void shouldThrowIfTopicAndPatternAreNull() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> new InternalTopologyBuilder.Source("name", null, null));
assertEquals("Either topics or pattern must be not-null, but both are null.", e.getMessage());
}
@Test
public void shouldThrowIfBothTopicAndPatternAreNotNull() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile("")));
assertEquals("Either topics or pattern must be null, but both are not null.", e.getMessage());
}
@Test
public void sourceShouldBeEqualIfNameAndTopicListAreTheSame() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
assertThat(base, equalTo(sameAsBase));
}
@Test
public void sourceShouldBeEqualIfNameAndPatternAreTheSame() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
assertThat(base, equalTo(sameAsBase));
}
@Test
public void sourceShouldNotBeEqualForDifferentNamesWithSameTopicList() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", Collections.singleton("topic"), null);
assertThat(base, not(equalTo(differentName)));
}
@Test
public void sourceShouldNotBeEqualForDifferentNamesWithSamePattern() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic"));
assertThat(base, not(equalTo(differentName)));
}
@Test
public void sourceShouldNotBeEqualForDifferentTopicList() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
final InternalTopologyBuilder.Source differentTopicList = new InternalTopologyBuilder.Source("name", Collections.emptySet(), null);
final InternalTopologyBuilder.Source differentTopic = new InternalTopologyBuilder.Source("name", Collections.singleton("topic2"), null);
assertThat(base, not(equalTo(differentTopicList)));
assertThat(base, not(equalTo(differentTopic)));
}
@Test
public void sourceShouldNotBeEqualForDifferentPattern() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic2"));
final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("top*"));
assertThat(base, not(equalTo(differentPattern)));
assertThat(base, not(equalTo(overlappingPattern)));
}
} }

Loading…
Cancel
Save