|
|
|
@ -114,7 +114,7 @@ object VerifyConsumerRebalance extends Logging {
@@ -114,7 +114,7 @@ object VerifyConsumerRebalance extends Logging {
|
|
|
|
|
// check if the owner is a valid consumer id |
|
|
|
|
consumerIdsForTopic match { |
|
|
|
|
case Some(consumerIds) => |
|
|
|
|
if(!consumerIds.contains(partitionOwner)) { |
|
|
|
|
if(!consumerIds.map(c => c.toString).contains(partitionOwner)) { |
|
|
|
|
error(("Owner %s for partition [%s,%d] is not a valid member of consumer " + |
|
|
|
|
"group %s").format(partitionOwner, topic, partition, group)) |
|
|
|
|
rebalanceSucceeded = false |
|
|
|
|