diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 543304c8bb7..4547bfcb44b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -17,6 +17,7 @@ import static org.junit.Assert.assertTrue; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -40,9 +41,7 @@ public class MetadataTest { Thread t2 = asyncFetch(topic); assertTrue("Awaiting update", t1.isAlive()); assertTrue("Awaiting update", t2.isAlive()); - // keep updating the metadata until no need to - while (metadata.timeToNextUpdate(time) == 0) - metadata.update(TestUtils.singletonCluster(topic, 1), time); + metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); @@ -53,8 +52,13 @@ public class MetadataTest { private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { - while (metadata.fetch().partitionsForTopic(topic) == null) - metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE); + while (metadata.fetch().partitionsForTopic(topic) == null) { + try { + metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); + } catch(TimeoutException e) { + // let it go + } + } } }; thread.start();