From 0c25c73782e6e70b8f37e3dda2fa2a5b0b1c8c65 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 27 Sep 2016 17:35:24 -0700 Subject: [PATCH] HOTFIX: fix npe in StreamsMetadataState when onChange has not been called If some StreamsMetadataState methods are called before the onChange method is called a NullPointerException was being thrown. Added null check for cluster in isInitialized method Author: Damian Guy Reviewers: Guozhang Wang Closes #1920 from dguy/fix-npe-streamsmetadata --- .../streams/processor/internals/StreamsMetadataState.java | 2 +- .../processor/internals/StreamsMetadataStateTest.java | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 6f9bea69af2..ccb2cdafeeb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -234,7 +234,7 @@ public class StreamsMetadataState { } private boolean isInitialized() { - return !clusterMetadata.topics().isEmpty(); + return clusterMetadata != null && !clusterMetadata.topics().isEmpty(); } private class SourceTopicsInfo { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index 03280a831a5..411e02db9b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -114,6 +114,11 @@ public class StreamsMetadataStateTest { discovery.onChange(hostToPartitions, cluster); } + @Test + public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception { + new StreamsMetadataState(builder).getAllMetadataForStore("store"); + } + @Test public void shouldGetAllStreamInstances() throws Exception { final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),