From b51002c576ea9758132d75a8a0fe454e1bc270a2 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 19 Oct 2016 14:29:53 -0700 Subject: [PATCH] KAFKA-4312: If filePath is empty string writeAsText should have more meaningful error message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …eaningful error message Author: bbejeck Reviewers: Guozhang Wang Closes #2042 from bbejeck/KAFKA-4312_write_as_text_throws_NPE_empty_string --- .../apache/kafka/streams/kstream/internals/KStreamImpl.java | 3 +++ .../apache/kafka/streams/kstream/internals/KTableImpl.java | 4 ++++ .../kafka/streams/kstream/internals/KStreamImplTest.java | 6 ++++++ .../kafka/streams/kstream/internals/KTableImplTest.java | 6 ++++++ 4 files changed, 19 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index b6c3401d758..bb77e963655 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -207,6 +207,9 @@ public class KStreamImpl extends AbstractStream implements KStream keySerde, Serde valSerde) { Objects.requireNonNull(filePath, "filePath can't be null"); + if (filePath.trim().isEmpty()) { + throw new TopologyBuilderException("filePath can't be an empty string"); + } String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; try { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index c53e761bfb0..fc1c07674c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -189,6 +189,10 @@ public class KTableImpl extends AbstractStream implements KTable keySerde, Serde valSerde) { + Objects.requireNonNull(filePath, "filePath can't be null"); + if (filePath.trim().isEmpty()) { + throw new TopologyBuilderException("filePath can't be an empty string"); + } String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; try { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index fb2afec6752..e5e334c48b4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -183,6 +184,11 @@ public class KStreamImplTest { testStream.writeAsText(null); } + @Test(expected = TopologyBuilderException.class) + public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception { + testStream.writeAsText("\t \t"); + } + @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnFlatMap() throws Exception { testStream.flatMap(null); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 4b9ea06b45d..afa1033d732 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Predicate; @@ -402,6 +403,11 @@ public class KTableImplTest { table.writeAsText(null); } + @Test(expected = TopologyBuilderException.class) + public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception { + table.writeAsText("\t \t"); + } + @Test(expected = NullPointerException.class) public void shouldNotAllowNullActionOnForEach() throws Exception { table.foreach(null);