@ -161,10 +161,11 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
@@ -161,10 +161,11 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
val consumerGroupService : ConsumerGroupService = prepareConsumerGroupService
val consumer = createConsumer ( )
consumer . subscribe ( List ( topic ) . asJava )
try {
consumer . subscribe ( List ( topic ) . asJava )
verifyAuthenticationException ( consumerGroupService . listGroups ( ) )
consumerGroupService . close ( )
verifyAuthenticationException ( consumerGroupService . listGroups ( ) )
} finally consumerGroupService . close ( )
}
@Test
@ -173,19 +174,23 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
@@ -173,19 +174,23 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
val consumerGroupService : ConsumerGroupService = prepareConsumerGroupService
val consumer = createConsumer ( )
consumer . subscribe ( List ( topic ) . asJava )
try {
consumer . subscribe ( List ( topic ) . asJava )
verifyWithRetry ( consumer . poll ( Duration . ofMillis ( 1000 ) ) )
assertEquals ( 1 , consumerGroupService . listConsumerGroups ( ) . size )
consumerGroupService . close ( )
verifyWithRetry ( consumer . poll ( Duration . ofMillis ( 1000 ) ) )
assertEquals ( 1 , consumerGroupService . listConsumerGroups ( ) . size )
}
finally consumerGroupService . close ( )
}
private def prepareConsumerGroupService = {
val propsFile = TestUtils . tempFile ( )
val propsStream = Files . newOutputStream ( propsFile . toPath )
propsStream . write ( "security.protocol=SASL_PLAINTEXT\n" . getBytes ( ) )
propsStream . write ( s" sasl.mechanism= $kafkaClientSaslMechanism " . getBytes ( ) )
propsStream . close ( )
try {
propsStream . write ( "security.protocol=SASL_PLAINTEXT\n" . getBytes ( ) )
propsStream . write ( s" sasl.mechanism= $kafkaClientSaslMechanism " . getBytes ( ) )
}
finally propsStream . close ( )
val cgcArgs = Array ( "--bootstrap-server" , brokerList ,
"--describe" ,