|
|
|
@ -187,10 +187,12 @@ private object PartitionAssignorTest extends Logging {
@@ -187,10 +187,12 @@ private object PartitionAssignorTest extends Logging {
|
|
|
|
|
// check for uniform assignment |
|
|
|
|
if (verifyAssignmentIsUniform) { |
|
|
|
|
val partitionCountForStream = partitionCountPerStream(globalAssignment) |
|
|
|
|
val maxCount = partitionCountForStream.valuesIterator.max |
|
|
|
|
val minCount = partitionCountForStream.valuesIterator.min |
|
|
|
|
assertTrue("Scenario %s: assignment is not uniform (partition counts per stream are in the range [%d, %d])" |
|
|
|
|
.format(scenario, minCount, maxCount), (maxCount - minCount) <= 1) |
|
|
|
|
if (partitionCountForStream.nonEmpty) { |
|
|
|
|
val maxCount = partitionCountForStream.valuesIterator.max |
|
|
|
|
val minCount = partitionCountForStream.valuesIterator.min |
|
|
|
|
assertTrue("Scenario %s: assignment is not uniform (partition counts per stream are in the range [%d, %d])" |
|
|
|
|
.format(scenario, minCount, maxCount), (maxCount - minCount) <= 1) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|