@ -59,8 +59,8 @@ public class MetadataTest {
time + = refreshBackoffMs ;
time + = refreshBackoffMs ;
assertTrue ( "Update needed now that backoff time expired" , metadata . timeToNextUpdate ( time ) = = 0 ) ;
assertTrue ( "Update needed now that backoff time expired" , metadata . timeToNextUpdate ( time ) = = 0 ) ;
String topic = "my-topic" ;
String topic = "my-topic" ;
Thread t1 = asyncFetch ( topic ) ;
Thread t1 = asyncFetch ( topic , 500 ) ;
Thread t2 = asyncFetch ( topic ) ;
Thread t2 = asyncFetch ( topic , 500 ) ;
assertTrue ( "Awaiting update" , t1 . isAlive ( ) ) ;
assertTrue ( "Awaiting update" , t1 . isAlive ( ) ) ;
assertTrue ( "Awaiting update" , t2 . isAlive ( ) ) ;
assertTrue ( "Awaiting update" , t2 . isAlive ( ) ) ;
// Perform metadata update when an update is requested on the async fetch thread
// Perform metadata update when an update is requested on the async fetch thread
@ -307,12 +307,12 @@ public class MetadataTest {
assertTrue ( "Unused topic expired when expiry disabled" , metadata . containsTopic ( "topic4" ) ) ;
assertTrue ( "Unused topic expired when expiry disabled" , metadata . containsTopic ( "topic4" ) ) ;
}
}
private Thread asyncFetch ( final String topic ) {
private Thread asyncFetch ( final String topic , final long maxWaitMs ) {
Thread thread = new Thread ( ) {
Thread thread = new Thread ( ) {
public void run ( ) {
public void run ( ) {
while ( metadata . fetch ( ) . partitionsForTopic ( topic ) = = null ) {
while ( metadata . fetch ( ) . partitionsForTopic ( topic ) = = null ) {
try {
try {
metadata . awaitUpdate ( metadata . requestUpdate ( ) , refreshBackoff Ms) ;
metadata . awaitUpdate ( metadata . requestUpdate ( ) , maxWait Ms) ;
} catch ( Exception e ) {
} catch ( Exception e ) {
backgroundError . set ( e . toString ( ) ) ;
backgroundError . set ( e . toString ( ) ) ;
}
}