@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.NotEnoughReplicasException ;
import org.apache.kafka.common.errors.TimeoutException ;
import org.apache.kafka.common.errors.TopicExistsException ;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException ;
import org.apache.kafka.common.internals.KafkaFutureImpl ;
import org.apache.kafka.common.utils.Time ;
import org.apache.kafka.common.utils.Utils ;
@ -134,35 +135,12 @@ public final class WorkerUtils {
@@ -134,35 +135,12 @@ public final class WorkerUtils {
}
}
/ * *
* Returns a list of all existing topic partitions that match the following criteria : topic
* name matches give regular expression ' topicRegex ' , topic is not internal , partitions are
* in range [ startPartition , endPartition ]
*
* @param log The logger to use .
* @param bootstrapServers The bootstrap server list .
* @param topicRegex Topic name regular expression
* @param startPartition Starting partition of partition range
* @param endPartition Ending partition of partition range
* @return List of topic partitions
* @throws Throwable If getting list of topics or their descriptions fails .
* /
public static Collection < TopicPartition > getMatchingTopicPartitions (
Logger log , String bootstrapServers ,
Map < String , String > commonClientConf , Map < String , String > adminClientConf ,
String topicRegex , int startPartition , int endPartition ) throws Throwable {
try ( AdminClient adminClient
= createAdminClient ( bootstrapServers , commonClientConf , adminClientConf ) ) {
return getMatchingTopicPartitions ( adminClient , topicRegex , startPartition , endPartition ) ;
} catch ( Exception e ) {
log . warn ( "Failed to get topic partitions matching {}" , topicRegex , e ) ;
throw e ;
}
}
/ * *
* The actual create topics functionality is separated into this method and called from the
* above method to be able to unit test with mock adminClient .
* @throws TopicExistsException if the specified topic already exists .
* @throws UnknownTopicOrPartitionException if topic creation was issued but failed to verify if it was created .
* @throws Throwable if creation of one or more topics fails ( except for the cases above ) .
* /
static void createTopics (
Logger log , AdminClient adminClient ,
@ -258,6 +236,8 @@ public final class WorkerUtils {
@@ -258,6 +236,8 @@ public final class WorkerUtils {
* @param topicsToVerify List of topics to verify
* @param topicsInfo Map of topic name to topic description , which includes topics in
* ' topicsToVerify ' list .
* @throws UnknownTopicOrPartitionException If at least one topic contained in ' topicsInfo '
* does not exist
* @throws RuntimeException If one or more topics have different number of partitions than
* described in ' topicsInfo '
* /