Browse Source

KAFKA-1836 metadata.fetch.timeout.ms set to zero blocks forever; reviewed by Neha Narkhede and Ewen Cheslack-Postava

pull/38/merge
Jaikiran Pai 10 years ago committed by Neha Narkhede
parent
commit
ad4883a0cd
  1. 13
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
  2. 41
      clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java

13
clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java

@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
* <p> * <p>
* This class is shared by the client thread (for partitioning) and the background sender thread. * This class is shared by the client thread (for partitioning) and the background sender thread.
* *
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update. * topic we don't have any metadata for it will trigger a metadata update.
*/ */
public final class Metadata { public final class Metadata {
@ -99,12 +99,17 @@ public final class Metadata {
/** /**
* Wait for metadata update until the current version is larger than the last version we know of * Wait for metadata update until the current version is larger than the last version we know of
*/ */
public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) { public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs; long remainingWaitMs = maxWaitMs;
while (this.version <= lastVerison) { while (this.version <= lastVersion) {
try { try {
wait(remainingWaitMs); if (remainingWaitMs != 0) {
wait(remainingWaitMs);
}
} catch (InterruptedException e) { /* this is fine */ } catch (InterruptedException e) { /* this is fine */
} }
long elapsed = System.currentTimeMillis() - begin; long elapsed = System.currentTimeMillis() - begin;

41
clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java

@ -3,24 +3,23 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at * License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
public class MetadataTest { public class MetadataTest {
private long refreshBackoffMs = 100; private long refreshBackoffMs = 100;
@ -49,13 +48,42 @@ public class MetadataTest {
assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
} }
/**
* Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't
* wait forever with a max timeout value of 0
*
* @throws Exception
* @see https://issues.apache.org/jira/browse/KAFKA-1836
*/
@Test
public void testMetadataUpdateWaitTime() throws Exception {
long time = 0;
metadata.update(Cluster.empty(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
// first try with a max wait time of 0 and ensure that this returns back without waiting forever
try {
metadata.awaitUpdate(metadata.requestUpdate(), 0);
fail("Wait on metadata update was expected to timeout, but it didn't");
} catch (TimeoutException te) {
// expected
}
// now try with a higher timeout value once
final long TWO_SECOND_WAIT = 2000;
try {
metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT);
fail("Wait on metadata update was expected to timeout, but it didn't");
} catch (TimeoutException te) {
// expected
}
}
private Thread asyncFetch(final String topic) { private Thread asyncFetch(final String topic) {
Thread thread = new Thread() { Thread thread = new Thread() {
public void run() { public void run() {
while (metadata.fetch().partitionsForTopic(topic) == null) { while (metadata.fetch().partitionsForTopic(topic) == null) {
try { try {
metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
} catch(TimeoutException e) { } catch (TimeoutException e) {
// let it go // let it go
} }
} }
@ -64,5 +92,4 @@ public class MetadataTest {
thread.start(); thread.start();
return thread; return thread;
} }
} }

Loading…
Cancel
Save