Browse Source

KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building (#7576)

Get rid of partitionStates that creates a new PartitionState for each state since all the callers do not require it to be a Seq.

Modify ReplicaFetcherThread constructor to fix the broken benchmark path.

This PR:

Benchmark                                  (partitionCount)  Mode  Cnt        Score       Error  Units
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15     9280.953 ±    55.967  ns/op
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15    61533.546 ±  1213.559  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   151306.146 ±  1820.222  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  1138547.929 ± 45301.938  ns/op

Trunk:

Benchmark                                  (partitionCount)  Mode  Cnt        Score       Error  Units |   |   |   |   |  
-- | -- | -- | -- | -- | --
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15     9305.588 ±    51.886  ns/op |   |   |   |   |  
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15    65216.933 ±   939.827  ns/op |   |   |   |   |  
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   151715.514 ±  1361.009  ns/op |   |   |   |   |  
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  1231958.103 ± 94


Reviewers: Jason Gustafson <jason@confluent.io>, Lucas Bradstreet <lucasbradstreet@gmail.com>
pull/7231/merge
Guozhang Wang 5 years ago committed by GitHub
parent
commit
59a75f4422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
  2. 13
      clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
  3. 29
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  4. 12
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

11
clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java

@ -85,17 +85,6 @@ public class PartitionStates<S> { @@ -85,17 +85,6 @@ public class PartitionStates<S> {
return map.containsKey(topicPartition);
}
/**
* Returns the partition states in order.
*/
public List<PartitionState<S>> partitionStates() {
List<PartitionState<S>> result = new ArrayList<>(map.size());
for (Map.Entry<TopicPartition, S> entry : map.entrySet()) {
result.add(new PartitionState<>(entry.getKey(), entry.getValue()));
}
return result;
}
public Stream<PartitionState<S>> stream() {
return map.entrySet().stream().map(entry -> new PartitionState<>(entry.getKey(), entry.getValue()));
}

13
clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java

@ -22,10 +22,8 @@ import org.junit.Test; @@ -22,10 +22,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PartitionStatesTest {
@ -43,8 +41,8 @@ public class PartitionStatesTest { @@ -43,8 +41,8 @@ public class PartitionStatesTest {
expected.put(new TopicPartition("baz", 3), "baz 3");
checkState(states, expected);
states.set(new LinkedHashMap<TopicPartition, String>());
checkState(states, new LinkedHashMap<TopicPartition, String>());
states.set(new LinkedHashMap<>());
checkState(states, new LinkedHashMap<>());
}
private LinkedHashMap<TopicPartition, String> createMap() {
@ -61,12 +59,7 @@ public class PartitionStatesTest { @@ -61,12 +59,7 @@ public class PartitionStatesTest {
private void checkState(PartitionStates<String> states, LinkedHashMap<TopicPartition, String> expected) {
assertEquals(expected.keySet(), states.partitionSet());
assertEquals(expected.size(), states.size());
List<PartitionStates.PartitionState<String>> statesList = new ArrayList<>();
for (Map.Entry<TopicPartition, String> entry : expected.entrySet()) {
statesList.add(new PartitionStates.PartitionState<>(entry.getKey(), entry.getValue()));
assertTrue(states.contains(entry.getKey()));
}
assertEquals(statesList, states.partitionStates());
assertEquals(expected, states.partitionStateMap());
}
@Test

29
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -31,7 +31,7 @@ import kafka.metrics.KafkaMetricsGroup @@ -31,7 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
import scala.collection.{mutable, Map, Seq, Set}
import scala.collection.{mutable, Map, Set}
import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
@ -214,10 +214,10 @@ abstract class AbstractFetcherThread(name: String, @@ -214,10 +214,10 @@ abstract class AbstractFetcherThread(name: String,
//Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs
val epochEndOffsets = endOffsets.filter { case (tp, _) =>
val curPartitionState = partitionStates.stateValue(tp)
val partitionEpochRequest = latestEpochsForPartitions.get(tp).getOrElse {
val partitionEpochRequest = latestEpochsForPartitions.getOrElse(tp, {
throw new IllegalStateException(
s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request")
}
})
val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}
@ -435,18 +435,17 @@ abstract class AbstractFetcherThread(name: String, @@ -435,18 +435,17 @@ abstract class AbstractFetcherThread(name: String,
* @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete
*/
private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = {
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
.map { state =>
val currentFetchState = state.value
val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match {
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala
.map { case (topicPartition, currentFetchState) =>
val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
case Some(offsetTruncationState) =>
val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating
PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
currentFetchState.currentLeaderEpoch, currentFetchState.delay, state)
case None => currentFetchState
}
(state.topicPartition, maybeTruncationComplete)
}.toMap
(topicPartition, maybeTruncationComplete)
}
partitionStates.set(newStates.asJava)
}
@ -648,12 +647,12 @@ abstract class AbstractFetcherThread(name: String, @@ -648,12 +647,12 @@ abstract class AbstractFetcherThread(name: String,
}
private[server] def partitionsAndOffsets: Map[TopicPartition, InitialFetchState] = inLock(partitionMapLock) {
partitionStates.partitionStates.asScala.map { state =>
partitionStates.partitionStateMap.asScala.map { case (topicPartition, currentFetchState) =>
val initialFetchState = InitialFetchState(sourceBroker,
currentLeaderEpoch = state.value.currentLeaderEpoch,
initOffset = state.value.fetchOffset)
state.topicPartition -> initialFetchState
}.toMap
currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
initOffset = currentFetchState.fetchOffset)
topicPartition -> initialFetchState
}
}
protected def toMemoryRecords(records: Records): MemoryRecords = {
@ -793,7 +792,7 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { @@ -793,7 +792,7 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
def this(offset: Long) = this(offset, true)
override def toString = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted)
override def toString: String = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted)
}
case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {

12
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

@ -36,6 +36,7 @@ import kafka.server.MetadataCache; @@ -36,6 +36,7 @@ import kafka.server.MetadataCache;
import kafka.server.OffsetAndEpoch;
import kafka.server.OffsetTruncationState;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.utils.KafkaScheduler;
@ -176,7 +177,9 @@ public class ReplicaFetcherThreadBenchmark { @@ -176,7 +177,9 @@ public class ReplicaFetcherThreadBenchmark {
new LinkedList<>(), fetched));
}
fetcher = new ReplicaFetcherBenchThread(config, pool);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
Mockito.when(replicaManager.brokerTopicStats()).thenReturn(brokerTopicStats);
fetcher = new ReplicaFetcherBenchThread(config, replicaManager, pool);
fetcher.addPartitions(offsetAndEpochs);
// force a pass to move partitions to fetching state. We do this in the setup phase
// so that we do not measure this time as part of the steady state work
@ -230,13 +233,16 @@ public class ReplicaFetcherThreadBenchmark { @@ -230,13 +233,16 @@ public class ReplicaFetcherThreadBenchmark {
static class ReplicaFetcherBenchThread extends ReplicaFetcherThread {
private final Pool<TopicPartition, Partition> pool;
ReplicaFetcherBenchThread(KafkaConfig config, Pool<TopicPartition, Partition> partitions) {
ReplicaFetcherBenchThread(KafkaConfig config,
ReplicaManager replicaManager,
Pool<TopicPartition,
Partition> partitions) {
super("name",
3,
new BrokerEndPoint(3, "host", 3000),
config,
new FailedPartitions(),
null,
replicaManager,
new Metrics(),
Time.SYSTEM,
new ReplicaQuota() {

Loading…
Cancel
Save