Browse Source

KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)

Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/4657/head
nafshartous 7 years ago committed by Guozhang Wang
parent
commit
cf092aeecc
  1. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
  2. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
  3. 16
      streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
  4. 37
      streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
  5. 15
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache;
@ -54,13 +55,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
} }
/** /**
* @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node * @throws StreamsException if an attempt is made to access this state store from an unknown node
*/ */
@SuppressWarnings("deprecation")
@Override @Override
public StateStore getStateStore(final String name) { public StateStore getStateStore(final String name) {
if (currentNode() == null) { if (currentNode() == null) {
throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node"); throw new StreamsException("Accessing from an unknown node");
} }
final StateStore global = stateManager.getGlobalStore(name); final StateStore global = stateManager.getGlobalStore(name);
@ -69,7 +69,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
} }
if (!currentNode().stateStores.contains(name)) { if (!currentNode().stateStores.contains(name)) {
throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name); throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name);
} }
return stateManager.getStore(name); return stateManager.getStore(name);

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -28,6 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
@ -694,7 +695,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
continue; continue;
} }
if (numPartitions < 0) { if (numPartitions < 0) {
throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
} }
topic.setNumberOfPartitions(numPartitions); topic.setNumberOfPartitions(numPartitions);

16
streams/src/test/java/org/apache/kafka/streams/TopologyTest.java

@ -17,7 +17,6 @@
package org.apache.kafka.streams; package org.apache.kafka.streams;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
@ -252,7 +251,7 @@ public class TopologyTest {
} catch (final TopologyException expected) { } } catch (final TopologyException expected) { }
} }
@Test(expected = TopologyBuilderException.class) @Test
public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
final String sourceNodeName = "source"; final String sourceNodeName = "source";
final String goodNodeName = "goodGuy"; final String goodNodeName = "goodGuy";
@ -276,15 +275,12 @@ public class TopologyTest {
try { try {
new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder); new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
fail("Should have thrown StreamsException");
} catch (final StreamsException e) { } catch (final StreamsException e) {
final Throwable cause = e.getCause(); final String error = e.toString();
if (cause != null final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
&& cause instanceof TopologyBuilderException
&& cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { assertThat(error, equalTo(expectedMessage));
throw (TopologyBuilderException) cause;
} else {
throw new RuntimeException("Did expect different exception. Did catch:", e);
}
} }
} }

37
streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java

@ -51,6 +51,7 @@ import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -590,8 +591,7 @@ public class TopologyBuilderTest {
assertEquals("appId-foo", topicConfig.name()); assertEquals("appId-foo", topicConfig.name());
} }
@Test
@Test(expected = TopologyBuilderException.class)
public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
final String sourceNodeName = "source"; final String sourceNodeName = "source";
final String goodNodeName = "goodGuy"; final String goodNodeName = "goodGuy";
@ -603,27 +603,24 @@ public class TopologyBuilderTest {
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final StreamsConfig streamsConfig = new StreamsConfig(config); final StreamsConfig streamsConfig = new StreamsConfig(config);
try { final TopologyBuilder builder = new TopologyBuilder();
final TopologyBuilder builder = new TopologyBuilder(); builder.addSource(sourceNodeName, "topic")
builder .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
.addSource(sourceNodeName, "topic") sourceNodeName)
.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
.addStateStore( .addStateStore(
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), Stores.create(LocalMockProcessorSupplier.STORE_NAME)
goodNodeName) .withStringKeys().withStringValues().inMemory()
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); .build(), goodNodeName)
.addProcessor(badNodeName, new LocalMockProcessorSupplier(),
sourceNodeName);
try {
final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder); final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
driver.process("topic", null, null); fail("Should have thrown StreamsException");
} catch (final StreamsException e) { } catch (final StreamsException e) {
final Throwable cause = e.getCause(); final String error = e.toString();
if (cause != null final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
&& cause instanceof TopologyBuilderException
&& cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { assertThat(error, equalTo(expectedMessage));
throw (TopologyBuilderException) cause;
} else {
throw new RuntimeException("Did expect different exception. Did catch:", e);
}
} }
} }

15
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
@ -53,7 +52,9 @@ import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -582,13 +583,11 @@ public class InternalTopologyBuilderTest {
try { try {
new ProcessorTopologyTestDriver(streamsConfig, builder); new ProcessorTopologyTestDriver(streamsConfig, builder);
fail("Should have throw StreamsException"); fail("Should have throw StreamsException");
} catch (final StreamsException expected) { } catch (final StreamsException e) {
final Throwable cause = expected.getCause(); final String error = e.toString();
if (cause == null final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
|| !(cause instanceof TopologyBuilderException)
|| !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { assertThat(error, equalTo(expectedMessage));
throw new RuntimeException("Did expect different exception. Did catch:", expected);
}
} }
} }

Loading…
Cancel
Save