|
|
|
@ -17,6 +17,7 @@ import static org.junit.Assert.assertTrue;
@@ -17,6 +17,7 @@ import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
|
|
|
|
import org.apache.kafka.clients.producer.internals.Metadata; |
|
|
|
|
import org.apache.kafka.common.Cluster; |
|
|
|
|
import org.apache.kafka.common.errors.TimeoutException; |
|
|
|
|
import org.apache.kafka.test.TestUtils; |
|
|
|
|
import org.junit.Test; |
|
|
|
|
|
|
|
|
@ -40,9 +41,7 @@ public class MetadataTest {
@@ -40,9 +41,7 @@ public class MetadataTest {
|
|
|
|
|
Thread t2 = asyncFetch(topic); |
|
|
|
|
assertTrue("Awaiting update", t1.isAlive()); |
|
|
|
|
assertTrue("Awaiting update", t2.isAlive()); |
|
|
|
|
// keep updating the metadata until no need to
|
|
|
|
|
while (metadata.timeToNextUpdate(time) == 0) |
|
|
|
|
metadata.update(TestUtils.singletonCluster(topic, 1), time); |
|
|
|
|
metadata.update(TestUtils.singletonCluster(topic, 1), time); |
|
|
|
|
t1.join(); |
|
|
|
|
t2.join(); |
|
|
|
|
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); |
|
|
|
@ -53,8 +52,13 @@ public class MetadataTest {
@@ -53,8 +52,13 @@ public class MetadataTest {
|
|
|
|
|
private Thread asyncFetch(final String topic) { |
|
|
|
|
Thread thread = new Thread() { |
|
|
|
|
public void run() { |
|
|
|
|
while (metadata.fetch().partitionsForTopic(topic) == null) |
|
|
|
|
metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE); |
|
|
|
|
while (metadata.fetch().partitionsForTopic(topic) == null) { |
|
|
|
|
try { |
|
|
|
|
metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); |
|
|
|
|
} catch(TimeoutException e) { |
|
|
|
|
// let it go
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
thread.start(); |
|
|
|
|