@ -965,6 +965,7 @@ public class KafkaAdminClientTest {
}
}
}
}
@Deprecated
@Test
@Test
public void testMetadataRetries ( ) throws Exception {
public void testMetadataRetries ( ) throws Exception {
// We should continue retrying on metadata update failures in spite of retry configuration
// We should continue retrying on metadata update failures in spite of retry configuration
@ -976,8 +977,7 @@ public class KafkaAdminClientTest {
try ( final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv ( Time . SYSTEM , bootstrapCluster ,
try ( final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv ( Time . SYSTEM , bootstrapCluster ,
newStrMap ( AdminClientConfig . BOOTSTRAP_SERVERS_CONFIG , "localhost:9999" ,
newStrMap ( AdminClientConfig . BOOTSTRAP_SERVERS_CONFIG , "localhost:9999" ,
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , "10000000" ,
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , "10000000" ,
AdminClientConfig . REQUEST_TIMEOUT_MS_CONFIG , "100" ,
AdminClientConfig . RETRIES_CONFIG , "0" ) ) ) {
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , "100" ) ) ) {
// The first request fails with a disconnect
// The first request fails with a disconnect
env . kafkaClient ( ) . prepareResponse ( null , true ) ;
env . kafkaClient ( ) . prepareResponse ( null , true ) ;
@ -4391,15 +4391,13 @@ public class KafkaAdminClientTest {
@Test
@Test
public void testAlterReplicaLogDirsPartialFailure ( ) throws Exception {
public void testAlterReplicaLogDirsPartialFailure ( ) throws Exception {
try ( AdminClientUnitTestEnv env = mockClientEnv (
long defaultApiTimeout = 60000 ;
AdminClientConfig . REQUEST_TIMEOUT_MS_CONFIG , "100" ,
MockTime time = new MockTime ( ) ;
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , "100" ) ) {
// As we won't retry, this calls fails immediately with a DisconnectException
try ( AdminClientUnitTestEnv env = mockClientEnv ( time ,
env . kafkaClient ( ) . prepareResponseFrom (
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , String . valueOf ( defaultApiTimeout ) ) ) {
prepareAlterLogDirsResponse ( Errors . NONE , "topic" , 1 ) ,
env . cluster ( ) . nodeById ( 0 ) ,
true ) ;
// Provide only one prepared response from node 1
env . kafkaClient ( ) . prepareResponseFrom (
env . kafkaClient ( ) . prepareResponseFrom (
prepareAlterLogDirsResponse ( Errors . NONE , "topic" , 2 ) ,
prepareAlterLogDirsResponse ( Errors . NONE , "topic" , 2 ) ,
env . cluster ( ) . nodeById ( 1 ) ) ;
env . cluster ( ) . nodeById ( 1 ) ) ;
@ -4413,6 +4411,17 @@ public class KafkaAdminClientTest {
AlterReplicaLogDirsResult result = env . adminClient ( ) . alterReplicaLogDirs ( logDirs ) ;
AlterReplicaLogDirsResult result = env . adminClient ( ) . alterReplicaLogDirs ( logDirs ) ;
// Wait until the prepared attempt has been consumed
TestUtils . waitForCondition ( ( ) - > env . kafkaClient ( ) . numAwaitingResponses ( ) = = 0 ,
"Failed awaiting requests" ) ;
// Wait until the request is sent out
TestUtils . waitForCondition ( ( ) - > env . kafkaClient ( ) . inFlightRequestCount ( ) = = 1 ,
"Failed awaiting request" ) ;
// Advance time past the default api timeout to time out the inflight request
time . sleep ( defaultApiTimeout + 1 ) ;
TestUtils . assertFutureThrows ( result . values ( ) . get ( tpr1 ) , ApiException . class ) ;
TestUtils . assertFutureThrows ( result . values ( ) . get ( tpr1 ) , ApiException . class ) ;
assertNull ( result . values ( ) . get ( tpr2 ) . get ( ) ) ;
assertNull ( result . values ( ) . get ( tpr2 ) . get ( ) ) ;
}
}
@ -4585,14 +4594,11 @@ public class KafkaAdminClientTest {
@Test
@Test
public void testDescribeLogDirsPartialFailure ( ) throws Exception {
public void testDescribeLogDirsPartialFailure ( ) throws Exception {
try ( AdminClientUnitTestEnv env = mockClientEnv (
long defaultApiTimeout = 60000 ;
AdminClientConfig . REQUEST_TIMEOUT_MS_CONFIG , "100" ,
MockTime time = new MockTime ( ) ;
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , "100" ) ) {
// As we won't retry, this calls fails immediately with a DisconnectException
try ( AdminClientUnitTestEnv env = mockClientEnv ( time ,
env . kafkaClient ( ) . prepareResponseFrom (
AdminClientConfig . DEFAULT_API_TIMEOUT_MS_CONFIG , String . valueOf ( defaultApiTimeout ) ) ) {
prepareDescribeLogDirsResponse ( Errors . NONE , "/data" ) ,
env . cluster ( ) . nodeById ( 0 ) ,
true ) ;
env . kafkaClient ( ) . prepareResponseFrom (
env . kafkaClient ( ) . prepareResponseFrom (
prepareDescribeLogDirsResponse ( Errors . NONE , "/data" ) ,
prepareDescribeLogDirsResponse ( Errors . NONE , "/data" ) ,
@ -4600,6 +4606,17 @@ public class KafkaAdminClientTest {
DescribeLogDirsResult result = env . adminClient ( ) . describeLogDirs ( Arrays . asList ( 0 , 1 ) ) ;
DescribeLogDirsResult result = env . adminClient ( ) . describeLogDirs ( Arrays . asList ( 0 , 1 ) ) ;
// Wait until the prepared attempt has been consumed
TestUtils . waitForCondition ( ( ) - > env . kafkaClient ( ) . numAwaitingResponses ( ) = = 0 ,
"Failed awaiting requests" ) ;
// Wait until the request is sent out
TestUtils . waitForCondition ( ( ) - > env . kafkaClient ( ) . inFlightRequestCount ( ) = = 1 ,
"Failed awaiting request" ) ;
// Advance time past the default api timeout to time out the inflight request
time . sleep ( defaultApiTimeout + 1 ) ;
TestUtils . assertFutureThrows ( result . descriptions ( ) . get ( 0 ) , ApiException . class ) ;
TestUtils . assertFutureThrows ( result . descriptions ( ) . get ( 0 ) , ApiException . class ) ;
assertNotNull ( result . descriptions ( ) . get ( 1 ) . get ( ) ) ;
assertNotNull ( result . descriptions ( ) . get ( 1 ) . get ( ) ) ;
}
}