From cf092aeecc473b70d81c00b604e29de8c9f6d84b Mon Sep 17 00:00:00 2001 From: nafshartous Date: Tue, 6 Mar 2018 20:49:33 -0500 Subject: [PATCH] KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645) Reviewers: Matthias J. Sax , Bill Bejeck , Guozhang Wang --- .../internals/ProcessorContextImpl.java | 8 ++-- .../internals/StreamsPartitionAssignor.java | 3 +- .../apache/kafka/streams/TopologyTest.java | 16 +++----- .../processor/TopologyBuilderTest.java | 37 +++++++++---------- .../InternalTopologyBuilderTest.java | 17 ++++----- 5 files changed, 37 insertions(+), 44 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 317581a6beb..42d3d70e396 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -54,13 +55,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } /** - * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node + * @throws StreamsException if an attempt is made to access this state store from an unknown node */ - @SuppressWarnings("deprecation") @Override public StateStore getStateStore(final String name) { if (currentNode() == null) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node"); + throw new StreamsException("Accessing from an unknown node"); } final StateStore global = stateManager.getGlobalStore(name); @@ -69,7 +69,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } if (!currentNode().stateStores.contains(name)) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name); + throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name); } return stateManager.getStore(name); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 71a84b2ca73..0edbe2f523d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TaskId; @@ -694,7 +695,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable continue; } if (numPartitions < 0) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); + throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); } topic.setNumberOfPartitions(numPartitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 992ffd818ff..68340910c2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -252,7 +251,7 @@ public class TopologyTest { } catch (final TopologyException expected) { } } - @Test(expected = TopologyBuilderException.class) + @Test public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; @@ -276,15 +275,12 @@ public class TopologyTest { try { new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder); + fail("Should have thrown StreamsException"); } catch (final StreamsException e) { - final Throwable cause = e.getCause(); - if (cause != null - && cause instanceof TopologyBuilderException - && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { - throw (TopologyBuilderException) cause; - } else { - throw new RuntimeException("Did expect different exception. Did catch:", e); - } + final String error = e.toString(); + final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; + + assertThat(error, equalTo(expectedMessage)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 7a815944ecd..f67b6341f8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -51,6 +51,7 @@ import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -590,8 +591,7 @@ public class TopologyBuilderTest { assertEquals("appId-foo", topicConfig.name()); } - - @Test(expected = TopologyBuilderException.class) + @Test public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; @@ -603,27 +603,24 @@ public class TopologyBuilderTest { config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); final StreamsConfig streamsConfig = new StreamsConfig(config); - try { - final TopologyBuilder builder = new TopologyBuilder(); - builder - .addSource(sourceNodeName, "topic") - .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName) + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource(sourceNodeName, "topic") + .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), + sourceNodeName) .addStateStore( - Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), - goodNodeName) - .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); - + Stores.create(LocalMockProcessorSupplier.STORE_NAME) + .withStringKeys().withStringValues().inMemory() + .build(), goodNodeName) + .addProcessor(badNodeName, new LocalMockProcessorSupplier(), + sourceNodeName); + try { final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder); - driver.process("topic", null, null); + fail("Should have thrown StreamsException"); } catch (final StreamsException e) { - final Throwable cause = e.getCause(); - if (cause != null - && cause instanceof TopologyBuilderException - && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { - throw (TopologyBuilderException) cause; - } else { - throw new RuntimeException("Did expect different exception. Did catch:", e); - } + final String error = e.toString(); + final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; + + assertThat(error, equalTo(expectedMessage)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index a39e545f513..901fc4b7e0a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -53,7 +52,9 @@ import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -578,17 +579,15 @@ public class InternalTopologyBuilderTest { Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), goodNodeName); builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); - + try { new ProcessorTopologyTestDriver(streamsConfig, builder); fail("Should have throw StreamsException"); - } catch (final StreamsException expected) { - final Throwable cause = expected.getCause(); - if (cause == null - || !(cause instanceof TopologyBuilderException) - || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { - throw new RuntimeException("Did expect different exception. Did catch:", expected); - } + } catch (final StreamsException e) { + final String error = e.toString(); + final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; + + assertThat(error, equalTo(expectedMessage)); } }