Browse Source

KAFKA-4312: If filePath is empty string writeAsText should have more meaningful error message

…eaningful error message

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2042 from bbejeck/KAFKA-4312_write_as_text_throws_NPE_empty_string
pull/1723/merge
Bill Bejeck 8 years ago committed by Guozhang Wang
parent
commit
b51002c576
  1. 3
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  2. 4
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  3. 6
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
  4. 6
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java

3
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -207,6 +207,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -207,6 +207,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
Objects.requireNonNull(filePath, "filePath can't be null");
if (filePath.trim().isEmpty()) {
throw new TopologyBuilderException("filePath can't be an empty string");
}
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
try {

4
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -189,6 +189,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @@ -189,6 +189,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
*/
@Override
public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
Objects.requireNonNull(filePath, "filePath can't be null");
if (filePath.trim().isEmpty()) {
throw new TopologyBuilderException("filePath can't be an empty string");
}
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
try {

6
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@ -183,6 +184,11 @@ public class KStreamImplTest { @@ -183,6 +184,11 @@ public class KStreamImplTest {
testStream.writeAsText(null);
}
@Test(expected = TopologyBuilderException.class)
public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
testStream.writeAsText("\t \t");
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
testStream.flatMap(null);

6
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java

@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals; @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
@ -402,6 +403,11 @@ public class KTableImplTest { @@ -402,6 +403,11 @@ public class KTableImplTest {
table.writeAsText(null);
}
@Test(expected = TopologyBuilderException.class)
public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
table.writeAsText("\t \t");
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullActionOnForEach() throws Exception {
table.foreach(null);

Loading…
Cancel
Save