From b46cb3b2975afd8a3e82a0265c57760d8b9910da Mon Sep 17 00:00:00 2001 From: Edward Ribeiro Date: Tue, 6 Oct 2015 13:39:17 -0700 Subject: [PATCH] KAFKA-2599: Fix Metadata.getClusterForCurrentTopics throws NPE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …h null checking Author: Edward Ribeiro Reviewers: Ismael Juma, Guozhang Wang Closes #262 from eribeiro/KAFKA-2599 --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 6 +++++- .../test/java/org/apache/kafka/clients/MetadataTest.java | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 7d4ffa79774..f2fca12e09d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -14,10 +14,12 @@ package org.apache.kafka.clients; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; @@ -237,11 +239,13 @@ public final class Metadata { private Cluster getClusterForCurrentTopics(Cluster cluster) { Collection partitionInfos = new ArrayList<>(); + List nodes = Collections.emptyList(); if (cluster != null) { for (String topic : this.topics) { partitionInfos.addAll(cluster.partitionsForTopic(topic)); } + nodes = cluster.nodes(); } - return new Cluster(cluster.nodes(), partitionInfos); + return new Cluster(nodes, partitionInfos); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index c42c7bcf362..b7160a1996e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -111,6 +111,9 @@ public class MetadataTest { assertEquals(100, metadata.timeToNextUpdate(1100)); assertEquals(100, metadata.lastSuccessfulUpdate()); + metadata.needMetadataForAllTopics(true); + metadata.update(null, time); + assertEquals(100, metadata.timeToNextUpdate(1000)); } @Test