diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 1d30f9edd95..dcf46581b91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; *
* This class is shared by the client thread (for partitioning) and the background sender thread. * - * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a + * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a * topic we don't have any metadata for it will trigger a metadata update. */ public final class Metadata { @@ -99,12 +99,17 @@ public final class Metadata { /** * Wait for metadata update until the current version is larger than the last version we know of */ - public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) { + public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) { + if (maxWaitMs < 0) { + throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); + } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; - while (this.version <= lastVerison) { + while (this.version <= lastVersion) { try { - wait(remainingWaitMs); + if (remainingWaitMs != 0) { + wait(remainingWaitMs); + } } catch (InterruptedException e) { /* this is fine */ } long elapsed = System.currentTimeMillis() - begin; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 4547bfcb44b..74605c38cfd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -3,24 +3,23 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.test.TestUtils; import org.junit.Test; +import static org.junit.Assert.*; + public class MetadataTest { private long refreshBackoffMs = 100; @@ -49,13 +48,42 @@ public class MetadataTest { assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); } + /** + * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't + * wait forever with a max timeout value of 0 + * + * @throws Exception + * @see https://issues.apache.org/jira/browse/KAFKA-1836 + */ + @Test + public void testMetadataUpdateWaitTime() throws Exception { + long time = 0; + metadata.update(Cluster.empty(), time); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); + // first try with a max wait time of 0 and ensure that this returns back without waiting forever + try { + metadata.awaitUpdate(metadata.requestUpdate(), 0); + fail("Wait on metadata update was expected to timeout, but it didn't"); + } catch (TimeoutException te) { + // expected + } + // now try with a higher timeout value once + final long TWO_SECOND_WAIT = 2000; + try { + metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT); + fail("Wait on metadata update was expected to timeout, but it didn't"); + } catch (TimeoutException te) { + // expected + } + } + private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { while (metadata.fetch().partitionsForTopic(topic) == null) { try { metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); - } catch(TimeoutException e) { + } catch (TimeoutException e) { // let it go } } @@ -64,5 +92,4 @@ public class MetadataTest { thread.start(); return thread; } - }